分享

基于HIVE文件格式的map reduce代码编写

yuwenge 发表于 2013-12-13 17:56:48 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 2 8896

我们的数据绝大多数都是在HIVE上,对HIVE的SEQUENCEFILE和RCFILE的存储格式都有利用,为了满足HIVE的数据开放,hive client的方式就比较单一,直接访问HIVE生成的HDFS数据也是一种必要途径,所以本文整理测试了如何编写基于TEXTFILE、SEQUENCEFILE、RCFILE的数据的map reduce的代码。以wordcount的逻辑展示3种MR的代码。


其实只要知道MAP的输入格式是什么,就知道如何在MAP中处理数据;只要知道REDUCE(也可能只有MAP)的输出格式,就知道如何把处理结果转成输出格式。

表1:
0713372425.png
如下代码片段是运行一个MR的最简单的配置:

  1. //map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
  2. JobConf conf = new JobConf(WordCountRC.class);
  3. //设置一个用户定义的job名称
  4. conf.setJobName("WordCountRC");
  5. //为job的输出数据设置Key类
  6. conf.setOutputKeyClass(Text.class);
  7. //为job输出设置value类
  8. conf.setOutputValueClass(IntWritable.class);
  9. //为job设置Mapper类
  10. conf.setMapperClass(MapClass.class);
  11. //为job设置Combiner类
  12. conf.setCombinerClass(Reduce.class);
  13. //为job设置Reduce类
  14. conf.setReducerClass(Reduce.class);
  15. //为map-reduce任务设置InputFormat实现类
  16. conf.setInputFormat(RCFileInputFormat.class);
  17. //为map-reduce任务设置OutputFormat实现类
  18. conf.setOutputFormat(TextOutputFormat.class);
  19. //为map-reduce job设置路径数组作为输入列表
  20. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  21. //为map-reduce job设置路径数组作为输出列表
  22. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  23. //运行一个job
  24. JobClient.runJob(conf);</font></font>
复制代码

而此刻,我们更多的是关注配置InputFormat和OutputFormat的setInputFormat和setOutputFormat。根据我们不同的输入输出做相应的配置,可以选择表1的任何格式。
当我们确定了输入输出格式,接下来就是来在实现map和reduce函数时首选对输入格式做相应的处理,然后处理具体的业务逻辑,最后把处理后的数据转成既定的输出格式。


如下是处理textfile、sequencefile、rcfile输入文件的wordcount代码,大家可以比较一下具体区别,应该就能处理更多其它输入文件或者输出文件格式的数据。
代码1:textfile版wordcount

  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapred.FileInputFormat;
  9. import org.apache.hadoop.mapred.FileOutputFormat;
  10. import org.apache.hadoop.mapred.JobClient;
  11. import org.apache.hadoop.mapred.JobConf;
  12. import org.apache.hadoop.mapred.MapReduceBase;
  13. import org.apache.hadoop.mapred.Mapper;
  14. import org.apache.hadoop.mapred.OutputCollector;
  15. import org.apache.hadoop.mapred.Reducer;
  16. import org.apache.hadoop.mapred.Reporter;
  17. public class WordCountTxt{
  18.   public static class MapClass extends MapReduceBase
  19.     implements Mapper<LongWritable, Text, Text, IntWritable> {
  20.    
  21.     private final static IntWritable one = new IntWritable(1);
  22.     private Text word = new Text();
  23.    
  24.        @Override
  25.        public void map(LongWritable key, Text value,
  26.                      OutputCollector<Text, IntWritable> output,
  27.             Reporter reporter) throws IOException {
  28.               String line = value.toString();
  29.               StringTokenizer itr = new StringTokenizer(line);
  30.               while (itr.hasMoreTokens()) {
  31.                      word.set(itr.nextToken());
  32.                      output.collect(word, one);
  33.               }
  34.   }
  35.   }
  36.   public static class Reduce extends MapReduceBase
  37.     implements Reducer<Text, IntWritable, Text, IntWritable> {
  38.    
  39.        @Override
  40.     public void reduce(Text key, Iterator<IntWritable> values,
  41.                        OutputCollector<Text, IntWritable> output,
  42.                        Reporter reporter) throws IOException {
  43.       int sum = 0;
  44.       while (values.hasNext()) {
  45.         sum += values.next().get();
  46.       }
  47.       output.collect(key, new IntWritable(sum));
  48.     }
  49.   }
  50.   public static void main(String[] args) throws Exception {
  51.          JobConf conf = new JobConf(WordCountTxt.class);
  52.          conf.setJobName("wordcounttxt");
  53.         
  54.          conf.setOutputKeyClass(Text.class);
  55.          conf.setOutputValueClass(IntWritable.class);
  56.         
  57.          conf.setMapperClass(MapClass.class);
  58.          conf.setCombinerClass(Reduce.class);
  59.          conf.setReducerClass(Reduce.class);
  60.         
  61.          FileInputFormat.setInputPaths(conf, new Path(args[0]));
  62.          FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  63.               
  64.          JobClient.runJob(conf);   
  65.   }
  66.   
  67. }
  68. 代码2:sequencefile版wordcount
  69. import java.io.IOException;
  70. import java.util.Iterator;
  71. import java.util.StringTokenizer;
  72. import org.apache.hadoop.fs.Path;
  73. import org.apache.hadoop.io.IntWritable;
  74. import org.apache.hadoop.io.Text;
  75. import org.apache.hadoop.mapred.FileInputFormat;
  76. import org.apache.hadoop.mapred.FileOutputFormat;
  77. import org.apache.hadoop.mapred.JobClient;
  78. import org.apache.hadoop.mapred.JobConf;
  79. import org.apache.hadoop.mapred.MapReduceBase;
  80. import org.apache.hadoop.mapred.Mapper;
  81. import org.apache.hadoop.mapred.OutputCollector;
  82. import org.apache.hadoop.mapred.Reducer;
  83. import org.apache.hadoop.mapred.Reporter;
  84. import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
  85. import org.apache.hadoop.mapred.TextOutputFormat;
  86. public class WordCountSeq {
  87.          public static class MapClass extends MapReduceBase
  88.            implements Mapper<Text, Text, Text, IntWritable> {
  89.          
  90.            private final static IntWritable one = new IntWritable(1);
  91.            private Text word = new Text();
  92.          
  93.               @Override
  94.               public void map(Text key, Text value,
  95.                            OutputCollector<Text, IntWritable> output,
  96.                    Reporter reporter) throws IOException {
  97.                      String line = value.toString();
  98.                      StringTokenizer itr = new StringTokenizer(line);
  99.                      while (itr.hasMoreTokens()) {
  100.                            word.set(itr.nextToken());
  101.                            output.collect(word, one);
  102.                      }
  103.          }
  104.          }
  105.          public static class Reduce extends MapReduceBase
  106.            implements Reducer<Text, IntWritable, Text, IntWritable> {
  107.          
  108.               @Override
  109.            public void reduce(Text key, Iterator<IntWritable> values,
  110.                               OutputCollector<Text, IntWritable> output,
  111.                               Reporter reporter) throws IOException {
  112.              int sum = 0;
  113.              while (values.hasNext()) {
  114.                sum += values.next().get();
  115.              }
  116.              output.collect(key, new IntWritable(sum));
  117.            }
  118.          }
  119.          /**
  120.           * @param args
  121.         * @throws IOException
  122.           */
  123.          public static void main(String[] args) throws IOException {
  124.               // TODO Auto-generated method stub
  125.                 JobConf conf = new JobConf(WordCountSeq.class);
  126.                 conf.setJobName("wordcountseq");
  127.                
  128.                 conf.setOutputKeyClass(Text.class);
  129.                 conf.setOutputValueClass(IntWritable.class);
  130.                
  131.                 conf.setMapperClass(MapClass.class);
  132.                 conf.setCombinerClass(Reduce.class);
  133.                 conf.setReducerClass(Reduce.class);
  134.                
  135.                 conf.setInputFormat(SequenceFileAsTextInputFormat.class);
  136.                 conf.setOutputFormat(TextOutputFormat.class);
  137.                
  138.                 FileInputFormat.setInputPaths(conf, new Path(args[0]));
  139.                 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  140.                      
  141.                 JobClient.runJob(conf);
  142.          }
  143. }
  144. 代码3:rcfile版wordcount
  145. import java.io.IOException;
  146. import java.util.Iterator;
  147. import org.apache.hadoop.fs.Path;
  148. import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
  149. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
  150. import org.apache.hadoop.io.IntWritable;
  151. import org.apache.hadoop.io.LongWritable;
  152. import org.apache.hadoop.io.Text;
  153. import org.apache.hadoop.mapred.FileInputFormat;
  154. import org.apache.hadoop.mapred.FileOutputFormat;
  155. import org.apache.hadoop.mapred.JobClient;
  156. import org.apache.hadoop.mapred.JobConf;
  157. import org.apache.hadoop.mapred.MapReduceBase;
  158. import org.apache.hadoop.mapred.Mapper;
  159. import org.apache.hadoop.mapred.OutputCollector;
  160. import org.apache.hadoop.mapred.Reducer;
  161. import org.apache.hadoop.mapred.Reporter;
  162. import org.apache.hadoop.mapred.TextOutputFormat;
  163. public class WordCountRC {
  164.    
  165.      public static class MapClass
  166.           extends MapReduceBase implements Mapper<LongWritable, BytesRefArrayWritable, Text, IntWritable> {
  167.          
  168.           private final static IntWritable one = new IntWritable(1);
  169.           private Text word =new Text();
  170.    
  171.           @Override
  172.           public void map(LongWritable key, BytesRefArrayWritable value,
  173.                     OutputCollector<Text, IntWritable> output, Reporter reporter)
  174.                     throws IOException {
  175.                Text txt = new Text();
  176.                txt.set(value.get(0).getData(), value.get(0).getStart(), value.get(0).getLength());
  177.                String[] result = txt.toString().split("\\s");
  178.                for(int i=0; i < result.length; i++){
  179.                     word.set(result[i]);
  180.                     output.collect(word, one);   
  181.                }
  182.           }         
  183.      }
  184.      public static class Reduce
  185.           extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
  186.    
  187.           private IntWritable result = new IntWritable();
  188.          
  189.           @Override
  190.           public void reduce(Text key, Iterator<IntWritable> value,
  191.                     OutputCollector<Text, IntWritable> output, Reporter reporter)
  192.                     throws IOException {
  193.                int sum = 0;
  194.                while (value.hasNext()) {
  195.                     sum += value.next().get();
  196.                }
  197.               
  198.                result.set(sum);
  199.                output.collect(key, result);              
  200.           }
  201.          
  202.      }
  203.      /**
  204.      * @param args
  205.      */
  206.      public static void main(String[] args) throws IOException{
  207.           JobConf conf = new JobConf(WordCountRC.class);
  208.           conf.setJobName("WordCountRC");
  209.          
  210.           conf.setOutputKeyClass(Text.class);
  211.           conf.setOutputValueClass(IntWritable.class);
  212.          
  213.           conf.setMapperClass(MapClass.class);
  214.           conf.setCombinerClass(Reduce.class);
  215.           conf.setReducerClass(Reduce.class);
  216.          
  217.           conf.setInputFormat(RCFileInputFormat.class);
  218.           conf.setOutputFormat(TextOutputFormat.class);
  219.          
  220.           FileInputFormat.setInputPaths(conf, new Path(args[0]));
  221.           FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  222.          
  223.           JobClient.runJob(conf);
  224.      }
  225. }
  226. 原始数据:
  227. hadoop fs -text /group/alidw-dev/seq_input/attempt_201201101606_2339628_m_000000_0
  228. 12/02/13 17:07:57 INFO util.NativeCodeLoader: Loaded the native-hadoop library
  229. 12/02/13 17:07:57 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
  230. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
  231. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
  232. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
  233. 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor
  234.         hello, i am ok. are you?
  235.         i am fine too!</font></font>
复制代码

编译打包完成后执行:

hadoop jarWordCountSeq.jar WordCountSeq /group/alidw-dev/seq_input/ /group/alidw-dev/rc_output



执行完毕就能看到最终结果:

  1. hadoop fs -cat /group/alidw-dev/seq_output/part-00000
  2. am      2
  3. are     1
  4. fine    1
  5. hello,  1
  6. i       2
  7. ok.     1
  8. too!    1
  9. you?    1</font></font>
复制代码

已有(2)人评论

跳转到指定楼层
qingfeng 发表于 2014-12-3 15:35:22
你好,想请教一个问题:我也是按照你的上面的方法写的,但不知道jar包有哪些?比如RCFileInputFormat和RCFileRecordReader还要将hive源代码中复制到java工程中,但是运行还是错误:Error: org/apache/hadoop/hive/ql/io/RCFile$Reader,谢谢!
回复

使用道具 举报

sstutu 发表于 2014-12-3 16:06:15
qingfeng 发表于 2014-12-3 15:35
你好,想请教一个问题:我也是按照你的上面的方法写的,但不知道jar包有哪些?比如RCFileInputFormat和RCFi ...


需要引用一些jar包,把它们导入工程中
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条