分享

mapreduce完成字符合并代码分享

nettman 发表于 2014-1-13 17:34:44 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 6969
现在有这样两个文件
a|b
a|c|d
通过mapreduce处理成
a|b|c|
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.DataInputBuffer;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.NullWritable;
  7. import org.apache.hadoop.io.RawComparator;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  16. public class plumebobo0001 {
  17.         public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
  18.                 Text myKey = new Text();
  19.                 Text myValue = new Text();
  20.                 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  21.                         if (!value.toString().contains("|"))
  22.                                 return;
  23.                         String[] myStr = value.toString().split("\\|");
  24.                         for (int i = 1; i < myStr.length; i++) {
  25.                                 myKey.set(myStr[0] + "|" + myStr[i]); //把数据放在Key中输出,value空
  26.                                 context.write(myKey, myValue);
  27.                         }
  28.                         
  29.                 }
  30.         }
  31.         public static class MyReducer extends Reducer<Text, Text, Text, NullWritable>
  32.         {
  33.                 Text myKey = new Text();
  34.                 NullWritable myValue;
  35.                 public void reduce(Text key, Iterable<Text> values, Context context)
  36.                                 throws IOException, InterruptedException {
  37.                         StringBuilder myStr = new StringBuilder("");
  38.                         
  39.                         //迭代取出Key中的数据
  40.                         //重写了grouping,所以这里不用再作二次排序
  41.                         for (Text val : values) {
  42.                                 if (myStr.length() == 0) {
  43.                                         myStr.append(key.toString());
  44.                                 }
  45.                                 else {
  46.                                         myStr.append("|");
  47.                                         myStr.append(key.toString().split("\\|")[1]);
  48.                                 }
  49.                         }
  50.                         myKey.set(myStr.toString());
  51.                         context.write(myKey, myValue);
  52.                 }
  53.         }
  54.         public static void main(String[] args) throws Exception {
  55.                 String Oarg[] = new String[2];
  56.                 Oarg[0] = "/tmp/plumebobo/test0001";
  57.                 Oarg[1] = "/tmp/plumebobo/out001";
  58.                 Configuration conf = new Configuration();
  59.                 conf.set("mapred.job.tracker", "m04.ct1.r01.hdp:9001");
  60.                 Job job = new Job(conf, "plumebobo0001");
  61.                 job.setJarByClass(plumebobo0001.class);
  62.                 job.setMapperClass(MyMapper.class);
  63.                 job.setReducerClass(MyReducer.class);
  64.                 job.setNumReduceTasks(1);
  65.                 job.setOutputFormatClass(TextOutputFormat.class);
  66.                 job.setMapOutputKeyClass(Text.class);
  67.                 job.setMapOutputValueClass(Text.class);
  68.                 job.setOutputKeyClass(Text.class);
  69.                 job.setOutputValueClass(NullWritable.class);
  70.                 job.setPartitionerClass(MyPartitioner.class);
  71.                 job.setGroupingComparatorClass(MyGroupingComparator.class);
  72.                 FileInputFormat.setInputPaths(job, new Path(Oarg[0]));
  73.                 FileOutputFormat.setOutputPath(job, new Path(Oarg[1]));
  74.                 job.waitForCompletion(true);
  75.         }
  76. }
  77. // 根据第一列 分区
  78. class MyPartitioner extends HashPartitioner<Text, Text>
  79. {
  80.         @Override
  81.         public int getPartition(Text key, Text value, int numPartitions) {
  82.                 Text cols = new Text(key.toString().split("\\|")[0]);
  83.                 return super.getPartition(cols, value, numPartitions);// cols[0]
  84.         }
  85. }
  86. // 以第一列 值 分组
  87. class MyGroupingComparator implements RawComparator<Text>
  88. {
  89.         // @Override
  90.         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  91.                 Text key1 = new Text();
  92.                 Text key2 = new Text();
  93.                 DataInputBuffer buffer = new DataInputBuffer();
  94.                 try {
  95.                         buffer.reset(b1, s1, l1);
  96.                         key1.readFields(buffer);
  97.                         buffer.reset(b2, s2, l2);
  98.                         key2.readFields(buffer);
  99.                 }
  100.                 catch (IOException e) {
  101.                         throw new RuntimeException(e);
  102.                 }
  103.                 String str1 = key1.toString().split("\\|")[0];
  104.                 String str2 = key2.toString().split("\\|")[0];
  105.                 return str1.compareTo(str2);
  106.         }
  107.         public int compare(Text o1, Text o2) {
  108.                 return 0;
  109.         }
  110. }
复制代码
加微信w3aboutyun,可拉入技术爱好者群

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条