前几天有DW用户反馈,在往一张表(RCFile表)中用“insert overwrite table partition(xx) select ...” 插入数据的时候,会产生重复文件。看了下这个作业log,发现map task 000005起了两个task attempt ,第二个attempt是推测执行,并且这两个attemp都在task close函数里面重命名temp文件成正式文件,而不是通过mapreduce框架的两阶段提交协议(two phrase commit protocol)在收到tasktracker发过来的commitTaskAction时再commit task来保证只有一个attemp的结果成为正式结果。
task log中的输出如下:
renamed path hdfs:// to hdfs:// . File size is 666922
其实这条hive语句原本只会起1个job(Launching Job 1 out of 1),当第一个job结束的时候,会起一个conditional task分析每个partition下平均文件的大小,如果小于hive.merge.smallfiles.avgsize(默认为16MB), 第一个job又是map-only job,并且打开hive.merge.mapfiles(默认为true) ,则会另外起一个merge-file job来合并小文件,第二个job中用到RCFileMergeMapper就是合并之前生成的小文件的。workaround的方式有两种,一种是关闭推测执行,不过会有可能有一个task比较慢导致瓶颈,另一种是关闭merge file job(set hive.merge.mapfiles=fasle),从而不使用RCFileMergeMapper,不过这样产生大量的小文件就无法合并。
如果解析出来是要起merge job,会创建一个BlockMergeTask(继承自Task),执行里面的execute方法,先设置好JobConf相应的参数,比如mapred.mapper.class, hive.rcfile.merge.output.dir等,然后创建一个JobClient并submitJob,map的执行逻辑在RCFileMergeMapper类中,它继承了老的MapRed API中的抽象类MapReduceBase,覆盖了configure和close方法,前面提到的rename操作就是在close方法中,MapRunner类中的run方法会循环调用真正执行mapper的map方法,并在finally调用mapper的close方法
public void close() throws IOException {
// close writer
if (outWriter == null) {
outWriter = null;
if (!exception) {
FileStatus fss = fs.getFileStatus(outPath);
LOG.info("renamed path " + outPath + " to " + finalPath
+ " . File size is " + fss.getLen());
if (!fs.rename(outPath, finalPath)) {
throw new IOException("Unable to rename output to " + finalPath);
} else {
if (!autoDelete) {
fs.delete(outPath, true);
job执行完成后,是有可能有同一task的不同attempt产生结果文件同时存在的,不过hive显然考虑到了这点,所以在merge作业执行后会调用RCFileMergeMapper.jobClose方法,它会先备份输出目录,然后将数据写入输出目录并调用Utilities.removeTempOrDuplicateFiles方法来删除重复文件,删除的逻辑是从文件名中提取taskid,如果同一个taskid有两个文件,则会将小的那个删除,不过在0.9版本中,RCFileMergeMapper对于目标表是动态分区表情况下不支持,所以还会有duplicated files,打上patch(https://issues.apache.org/jira/browse/HIVE-3149?attachmentOrder=asc )后解决问题
RCFileMergeMapper execute方法finally处理逻辑,源代码中catch住exception后无任何处理,我加了一些stack trace输出和设置return value
finally {
try {
if (ctxCreated) {
if (rj != null) {
if (returnVal != 0) {
jobID = rj.getID().toString();
RCFileMergeMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx());
} catch (Exception e) {
console.printError("RCFile Merger Job Close Error", "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
success = false;
returnVal = -500;
public static void jobClose(String outputPath, boolean success, JobConf job,
LogHelper console, DynamicPartitionCtx dynPartCtx) throws HiveException, IOException {
Path outpath = new Path(outputPath);
FileSystem fs = outpath.getFileSystem(job);
Path backupPath = backupOutputPath(fs, outpath, job);
Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null);
if (backupPath != null) {
fs.delete(backupPath, true);
public static void mvFileToFinalPath(String specPath, Configuration hconf,
boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException,
HiveException {
FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+ ".intermediate");
Path finalPath = new Path(specPath);
if (success) {
if (fs.exists(tmpPath)) {
// Step1: rename tmp output folder to intermediate path. After this
// point, updates from speculative tasks still writing to tmpPath
// will not appear in finalPath.
log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
Utilities.rename(fs, tmpPath, intermediatePath);
// Step2: remove any tmp file or double-committed output files
ArrayList<String> emptyBuckets =
Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
// create empty buckets if necessary
if (emptyBuckets.size() > 0) {
createEmptyBuckets(hconf, emptyBuckets, conf);
// Step3: move to the file destination
log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
} else {
fs.delete(tmpPath, true);
fs.delete(taskTmpPath, true);
removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx),动态分区和非动态分区表不同处理逻辑
* Remove all temporary files and duplicate (double-committed) files from a given directory.
* @return a list of path names corresponding to should-be-created empty buckets.
public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path,
DynamicPartitionCtx dpCtx) throws IOException {
if (path == null) {
return null;
ArrayList<String> result = new ArrayList<String>();
if (dpCtx != null) {
FileStatus parts[] = getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
HashMap<String, FileStatus> taskIDToFile = null;
for (int i = 0; i < parts.length; ++i) {
assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
+ " is not a direcgtory";
FileStatus[] items = fs.listStatus(parts[i].getPath());
// remove empty directory since DP insert should not generate empty partitions.
// empty directories could be generated by crashed Task/ScriptOperator
if (items.length == 0) {
if (!fs.delete(parts[i].getPath(), true)) {
LOG.error("Cannot delete empty directory " + parts[i].getPath());
throw new IOException("Cannot delete empty directory " + parts[i].getPath());
taskIDToFile = removeTempOrDuplicateFiles(items, fs);
// if the table is bucketed and enforce bucketing, we should check and generate all buckets
if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null) {
// refresh the file list
items = fs.listStatus(parts[i].getPath());
// get the missing buckets and generate empty buckets
String taskID1 = taskIDToFile.keySet().iterator().next();
Path bucketPath = taskIDToFile.values().iterator().next().getPath();
for (int j = 0; j < dpCtx.getNumBuckets(); ++j) {
String taskID2 = replaceTaskId(taskID1, j);
if (!taskIDToFile.containsKey(taskID2)) {
// create empty bucket, file name should be derived from taskID2
String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j);
} else {
FileStatus[] items = fs.listStatus(path);
removeTempOrDuplicateFiles(items, fs);
return result;
removeTempOrDuplicateFiles(FileStatus[] items, FileSystem fs),对于每个目录下相同taskid不同attemptid的文件进行去重
public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(FileStatus[] items,
FileSystem fs) throws IOException {
if (items == null || fs == null) {
return null;
HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
for (FileStatus one : items) {
if (isTempPath(one)) {
if (!fs.delete(one.getPath(), true)) {
throw new IOException("Unable to delete tmp file: " + one.getPath());
} else {
String taskId = getTaskIdFromFilename(one.getPath().getName());
FileStatus otherFile = taskIdToFile.get(taskId);
if (otherFile == null) {
taskIdToFile.put(taskId, one);
} else {
// Compare the file sizes of all the attempt files for the same task, the largest win
// any attempt files could contain partial results (due to task failures or
// speculative runs), but the largest should be the correct one since the result
// of a successful run should never be smaller than a failed/speculative run.
FileStatus toDelete = null;
if (otherFile.getLen() >= one.getLen()) {
toDelete = one;
} else {
toDelete = otherFile;
taskIdToFile.put(taskId, one);
long len1 = toDelete.getLen();
long len2 = taskIdToFile.get(taskId).getLen();
if (!fs.delete(toDelete.getPath(), true)) {
throw new IOException("Unable to delete duplicate file: " + toDelete.getPath()
+ ". Existing file: " + taskIdToFile.get(taskId).getPath());
} else {
LOG.info("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
+ len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + " with length "
+ len2);
return taskIdToFile;
