分享

Hive RCFile合并作业产生重复数据问题

lzw 2014-8-29 20:26:55 发表于 问题解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 10376

问题导读:
hive插入数据产生重复数据的原因是什么?
hive是如何解决的?







前几天有DW用户反馈,在往一张表(RCFile表)中用“insert overwrite table partition(xx) select ...” 插入数据的时候,会产生重复文件。看了下这个作业log,发现map task 000005起了两个task attempt ,第二个attempt是推测执行,并且这两个attemp都在task close函数里面重命名temp文件成正式文件,而不是通过mapreduce框架的两阶段提交协议(two phrase commit protocol)在收到tasktracker发过来的commitTaskAction时再commit task来保证只有一个attemp的结果成为正式结果。

task log中的输出如下:


  1. attempt_201304111550_268224_m_000005_0  
  2. renamed path hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_task_tmp.-ext-10000/hp_cal_month=2013-04/_tmp.000005_0 to hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_tmp.-ext-10000/hp_cal_month=2013-04/000005_0 . File size is 666922  
  3.   
  4. attempt_201304111550_268234_m_000005_1  
  5. renamed path hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_task_tmp.-ext-10000/hp_cal_month=2013-04/_tmp.000005_1 to hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_tmp.-ext-10000/hp_cal_month=2013-04/000005_1 . File size is 666922  
复制代码

其实这条hive语句原本只会起1个job(Launching Job 1 out of 1),当第一个job结束的时候,会起一个conditional task分析每个partition下平均文件的大小,如果小于hive.merge.smallfiles.avgsize(默认为16MB), 第一个job又是map-only job,并且打开hive.merge.mapfiles(默认为true) ,则会另外起一个merge-file job来合并小文件,第二个job中用到RCFileMergeMapper就是合并之前生成的小文件的。workaround的方式有两种,一种是关闭推测执行,不过会有可能有一个task比较慢导致瓶颈,另一种是关闭merge file job(set hive.merge.mapfiles=fasle),从而不使用RCFileMergeMapper,不过这样产生大量的小文件就无法合并。


如果解析出来是要起merge job,会创建一个BlockMergeTask(继承自Task),执行里面的execute方法,先设置好JobConf相应的参数,比如mapred.mapper.class, hive.rcfile.merge.output.dir等,然后创建一个JobClient并submitJob,map的执行逻辑在RCFileMergeMapper类中,它继承了老的MapRed API中的抽象类MapReduceBase,覆盖了configure和close方法,前面提到的rename操作就是在close方法中,MapRunner类中的run方法会循环调用真正执行mapper的map方法,并在finally调用mapper的close方法


  1. public void close() throws IOException {  
  2.   // close writer  
  3.   if (outWriter == null) {  
  4.     return;  
  5.   }  
  6.   
  7.   outWriter.close();  
  8.   outWriter = null;  
  9.   
  10.   if (!exception) {  
  11.     FileStatus fss = fs.getFileStatus(outPath);  
  12.     LOG.info("renamed path " + outPath + " to " + finalPath  
  13.         + " . File size is " + fss.getLen());  
  14.     if (!fs.rename(outPath, finalPath)) {  
  15.       throw new IOException("Unable to rename output to " + finalPath);  
  16.     }  
  17.   } else {  
  18.     if (!autoDelete) {  
  19.       fs.delete(outPath, true);  
  20.     }  
  21.   }  
  22. }  
复制代码

job执行完成后,是有可能有同一task的不同attempt产生结果文件同时存在的,不过hive显然考虑到了这点,所以在merge作业执行后会调用RCFileMergeMapper.jobClose方法,它会先备份输出目录,然后将数据写入输出目录并调用Utilities.removeTempOrDuplicateFiles方法来删除重复文件,删除的逻辑是从文件名中提取taskid,如果同一个taskid有两个文件,则会将小的那个删除,不过在0.9版本中,RCFileMergeMapper对于目标表是动态分区表情况下不支持,所以还会有duplicated files,打上patch(https://issues.apache.org/jira/browse/HIVE-3149?attachmentOrder=asc)后解决问题

RCFileMergeMapper execute方法finally处理逻辑,源代码中catch住exception后无任何处理,我加了一些stack trace输出和设置return value


  1. finally {  
  2.       try {  
  3.         if (ctxCreated) {  
  4.           ctx.clear();  
  5.         }  
  6.         if (rj != null) {  
  7.           if (returnVal != 0) {  
  8.             rj.killJob();  
  9.           }  
  10.           HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());  
  11.           jobID = rj.getID().toString();  
  12.         }  
  13.         RCFileMergeMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx());  
  14.       } catch (Exception e) {  
  15.         console.printError("RCFile Merger Job Close Error", "\n"  
  16.             + org.apache.hadoop.util.StringUtils.stringifyException(e));  
  17.         e.printStackTrace(System.err);  
  18.         success = false;  
  19.         returnVal = -500;  
  20.       }  
  21.     }  
复制代码

jobClose方法

  1. public static void jobClose(String outputPath, boolean success, JobConf job,  
  2.     LogHelper console, DynamicPartitionCtx dynPartCtx) throws HiveException, IOException {  
  3.   Path outpath = new Path(outputPath);  
  4.   FileSystem fs = outpath.getFileSystem(job);  
  5.   Path backupPath = backupOutputPath(fs, outpath, job);  
  6.   Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null);  
  7.   if (backupPath != null) {  
  8.     fs.delete(backupPath, true);  
  9.   }  
  10. }  
复制代码

mvFileToFinalPath方法

  1. public static void mvFileToFinalPath(String specPath, Configuration hconf,  
  2.     boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException,  
  3.     HiveException {  
  4.   
  5.   FileSystem fs = (new Path(specPath)).getFileSystem(hconf);  
  6.   Path tmpPath = Utilities.toTempPath(specPath);  
  7.   Path taskTmpPath = Utilities.toTaskTempPath(specPath);  
  8.   Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()  
  9.       + ".intermediate");  
  10.   Path finalPath = new Path(specPath);  
  11.   if (success) {  
  12.     if (fs.exists(tmpPath)) {  
  13.       // Step1: rename tmp output folder to intermediate path. After this  
  14.       // point, updates from speculative tasks still writing to tmpPath  
  15.       // will not appear in finalPath.  
  16.       log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);  
  17.       Utilities.rename(fs, tmpPath, intermediatePath);  
  18.       // Step2: remove any tmp file or double-committed output files  
  19.       ArrayList<String> emptyBuckets =  
  20.           Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);  
  21.       // create empty buckets if necessary  
  22.       if (emptyBuckets.size() > 0) {  
  23.         createEmptyBuckets(hconf, emptyBuckets, conf);  
  24.       }  
  25.   
  26.       // Step3: move to the file destination  
  27.       log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);  
  28.       Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);  
  29.     }  
  30.   } else {  
  31.     fs.delete(tmpPath, true);  
  32.   }  
  33.   fs.delete(taskTmpPath, true);  
  34. }  
复制代码

removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx),动态分区和非动态分区表不同处理逻辑
  1. /**
  2.    * Remove all temporary files and duplicate (double-committed) files from a given directory.
  3.    *
  4.    * @return a list of path names corresponding to should-be-created empty buckets.
  5.    */  
  6.   public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path,  
  7.       DynamicPartitionCtx dpCtx) throws IOException {  
  8.     if (path == null) {  
  9.       return null;  
  10.     }  
  11.   
  12.     ArrayList<String> result = new ArrayList<String>();  
  13.     if (dpCtx != null) {  
  14.       FileStatus parts[] = getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);  
  15.       HashMap<String, FileStatus> taskIDToFile = null;  
  16.   
  17.       for (int i = 0; i < parts.length; ++i) {  
  18.         assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()  
  19.             + " is not a direcgtory";  
  20.         FileStatus[] items = fs.listStatus(parts[i].getPath());  
  21.   
  22.         // remove empty directory since DP insert should not generate empty partitions.  
  23.         // empty directories could be generated by crashed Task/ScriptOperator  
  24.         if (items.length == 0) {  
  25.           if (!fs.delete(parts[i].getPath(), true)) {  
  26.             LOG.error("Cannot delete empty directory " + parts[i].getPath());  
  27.             throw new IOException("Cannot delete empty directory " + parts[i].getPath());  
  28.           }  
  29.         }  
  30.   
  31.         taskIDToFile = removeTempOrDuplicateFiles(items, fs);  
  32.         // if the table is bucketed and enforce bucketing, we should check and generate all buckets  
  33.         if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null) {  
  34.           // refresh the file list  
  35.           items = fs.listStatus(parts[i].getPath());  
  36.           // get the missing buckets and generate empty buckets  
  37.           String taskID1 = taskIDToFile.keySet().iterator().next();  
  38.           Path bucketPath = taskIDToFile.values().iterator().next().getPath();  
  39.           for (int j = 0; j < dpCtx.getNumBuckets(); ++j) {  
  40.             String taskID2 = replaceTaskId(taskID1, j);  
  41.             if (!taskIDToFile.containsKey(taskID2)) {  
  42.               // create empty bucket, file name should be derived from taskID2  
  43.               String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j);  
  44.               result.add(path2);  
  45.             }  
  46.           }  
  47.         }  
  48.       }  
  49.     } else {  
  50.       FileStatus[] items = fs.listStatus(path);  
  51.       removeTempOrDuplicateFiles(items, fs);  
  52.     }  
  53.     return result;  
  54.   }
复制代码

removeTempOrDuplicateFiles(FileStatus[] items, FileSystem fs),对于每个目录下相同taskid不同attemptid的文件进行去重
  1. public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(FileStatus[] items,  
  2.     FileSystem fs) throws IOException {  
  3.   
  4.   if (items == null || fs == null) {  
  5.     return null;  
  6.   }  
  7.   
  8.   HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();  
  9.   
  10.   for (FileStatus one : items) {  
  11.     if (isTempPath(one)) {  
  12.       if (!fs.delete(one.getPath(), true)) {  
  13.         throw new IOException("Unable to delete tmp file: " + one.getPath());  
  14.       }  
  15.     } else {  
  16.       String taskId = getTaskIdFromFilename(one.getPath().getName());  
  17.       FileStatus otherFile = taskIdToFile.get(taskId);  
  18.       if (otherFile == null) {  
  19.         taskIdToFile.put(taskId, one);  
  20.       } else {  
  21.         // Compare the file sizes of all the attempt files for the same task, the largest win  
  22.         // any attempt files could contain partial results (due to task failures or  
  23.         // speculative runs), but the largest should be the correct one since the result  
  24.         // of a successful run should never be smaller than a failed/speculative run.  
  25.         FileStatus toDelete = null;  
  26.         if (otherFile.getLen() >= one.getLen()) {  
  27.           toDelete = one;  
  28.         } else {  
  29.           toDelete = otherFile;  
  30.           taskIdToFile.put(taskId, one);  
  31.         }  
  32.         long len1 = toDelete.getLen();  
  33.         long len2 = taskIdToFile.get(taskId).getLen();  
  34.         if (!fs.delete(toDelete.getPath(), true)) {  
  35.           throw new IOException("Unable to delete duplicate file: " + toDelete.getPath()  
  36.               + ". Existing file: " + taskIdToFile.get(taskId).getPath());  
  37.         } else {  
  38.           LOG.info("Duplicate taskid file removed: " + toDelete.getPath() + " with length "  
  39.               + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + " with length "  
  40.               + len2);  
  41.         }  
  42.       }  
  43.     }  
  44.   }  
  45.   return taskIdToFile;  
  46. }  
复制代码












链接http://blog.csdn.net/lalaguozhe/article/details/9095679
欢迎加入about云群9037177932227315139327136 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条