给你一个该思路实现,希望对你有帮助
new Thread("Thread to update HDFS File Name") {
override def run(): Unit = {
while (true) {
try {
val conf = new org.apache.hadoop.conf.Configuration()
val hdfs = FileSystem.get(conf)
val path = new Path(ProjectConf.resultPath)
val status = hdfs.listStatus(path)
for (a <- status) {
val fileName = a.getPath.getName
val filePath = a.getPath.toString
if (fileName.startsWith("-")) {
//println("hdfs file->" + a.getPath.toString)
val filesStatus = hdfs.listStatus(new Path(filePath))
val containFiles = new ArrayBuffer[String]()
for (a <- filesStatus) {
containFiles.append(a.getPath.getName)
}
if (containFiles.contains("_SUCCESS")) {
val temp = containFiles.filter(a => !a.equals("_SUCCESS"))
if (temp.size > 0) {
val basePath = ProjectConf.resultPath + fileName + "/"
val partFilePath = basePath + temp(0)
val fileSize = hdfs.getFileStatus(new Path(partFilePath)).getLen
//文件大小为0
if (fileSize == 0) {
hdfs.delete(new Path(basePath), true)
}
//文件大小大于0
else {
val br = new BufferedReader(new InputStreamReader(hdfs.open(new Path(partFilePath))))
val line = br.readLine()
if (StrUtil.isNotEmpty(line)) {
val fields = line.split(",")
if (fields.length > 0) {
val timeH = fields(0).substring(0, 10)
//重命名
hdfs.rename(new Path(filePath), new Path(ProjectConf.resultPath + "mr_" + timeH + "_" + System.currentTimeMillis() + "/"))
}
}
br.close()
}
} else {
//删除只含_SUCCESS标志文件目录
hdfs.delete(new Path(ProjectConf.resultPath + fileName + "/"), true)
}
val successFilePath = ProjectConf.resultPath + fileName + "/" + "_SUCCESS"
if (hdfs.exists(new Path(successFilePath))) {
hdfs.delete(new Path(successFilePath), false)
}
}
}
}
hdfs.close()
Thread.sleep(ProjectConf.renameHDFSFileDuration * 1000)
} catch {
case e: Exception => e.printStackTrace()
}
}
}
}.start() |