分享

【mapreduce进阶编程二】奇偶行分别求和

cknote 发表于 2014-9-30 11:21:24 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 9909


默认的TextInputFormat是按行输入,key为每行的起始偏移量,value为行的内容。这里实现key为行号,而不是偏移量。

0.环境:

hadoop2.4.0 四节点:master(192.168.137.100) slave1(192.168.137.101) slave2(192.168.137.102) slave3(192.168.137.103)
window8下通过eclipse的插件远程提交job到hadoop环境

1.描述:

   要求对数据按奇数行和偶数行分别求和
输入数据:
  1. 10
  2. 20
  3. 30
  4. 40
  5. 50
  6. 60
  7. 70
复制代码



预期输出:

  1. 奇数行之和:10 + 30 + 50 + 70 =         160
  2. 偶数行之和:20 + 40 + 60 =         120
复制代码



2.设计思路:
   为了实现奇数行和偶数行分别求和,我们首先需要取得行号。这里我们设计key为行号,默认的TextInputFormat的RecordReader为LineRecordReader,取出来的key为偏移量,所以我们要自定义自己的InputFormat和RecordReader。另外我们还要分别把奇数行和偶数行发送到同一个reduce(应该是reduce拉取数据,为了好理解),我们需要自定义我们自己的分区MyPartitioner。

3.代码实现:

3.1首先我们仿照LineRecordReader编写我们自己的MyRecordReader

  1. package com.cknote.hadoop.recordreader;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FSDataInputStream;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.InputSplit;
  10. import org.apache.hadoop.mapreduce.RecordReader;
  11. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  13. import org.apache.hadoop.util.LineReader;
  14. public class MyRecordReader extends RecordReader<LongWritable, Text> {
  15.         private long start;//分片开始偏移量
  16.         private long end;//分片结束偏移量
  17.         private long pos;//行号
  18.         private FSDataInputStream fin = null;
  19.         private LongWritable key = null;
  20.         private Text value = null;
  21.         private LineReader reader = null;
  22.        
  23.         /**
  24.          * 重要的方法
  25.          */
  26.         @Override
  27.         public void initialize(InputSplit inputSplit, TaskAttemptContext context)
  28.                         throws IOException, InterruptedException {
  29.                 FileSplit fileSplit = (FileSplit)inputSplit;
  30.                
  31.                 start = fileSplit.getStart();
  32.                 end = start + fileSplit.getLength();
  33.                
  34.                 Configuration conf = context.getConfiguration();
  35.                 Path path = fileSplit.getPath();
  36.                 FileSystem fs = path.getFileSystem(conf);
  37.                 fin = fs.open(path);
  38.                 fin.seek(start);
  39.                 reader = new LineReader(fin);
  40.                 pos = 1;
  41.                
  42.         }
  43.         /**
  44.          * 重要的方法
  45.          */
  46.         @Override
  47.         public boolean nextKeyValue() throws IOException, InterruptedException {
  48.                 if(key == null){
  49.                         key = new LongWritable();
  50.                 }
  51.                 key.set(pos);
  52.                 if(value == null){
  53.                         value = new Text();
  54.                 }
  55.                
  56.                 if(reader.readLine(value) == 0){
  57.                         return false;
  58.                 }
  59.                 pos++;
  60.                 return true;
  61.         }
  62.         @Override
  63.         public LongWritable getCurrentKey() throws IOException,
  64.                         InterruptedException {
  65.                 return key;
  66.         }
  67.         @Override
  68.         public Text getCurrentValue() throws IOException, InterruptedException {
  69.                 return value;
  70.         }
  71.         @Override
  72.         public float getProgress() throws IOException, InterruptedException {
  73.                
  74.                 return 0;
  75.                
  76.         }
  77.         @Override
  78.         public void close() throws IOException {
  79.                 fin.close();
  80.         }
  81. }
复制代码




3.2根据TextInputFormat实现自己的MyFileInputFormat
并且重载isSplitable方法
  1. package com.cknote.hadoop.recordreader;
  2. import java.io.IOException;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.InputSplit;
  7. import org.apache.hadoop.mapreduce.JobContext;
  8. import org.apache.hadoop.mapreduce.RecordReader;
  9. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. public class MyFileInputFormat extends FileInputFormat<LongWritable, Text> {
  12.         @Override
  13.         public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
  14.                         TaskAttemptContext arg1) throws IOException, InterruptedException {
  15.                 return new MyRecordReader();
  16.         }
  17.         @Override
  18.         protected boolean isSplitable(JobContext context, Path filename) {
  19.                 // 文件不分割
  20.                 return false;
  21.         }
  22. }
复制代码




3.3参考HashPartition编写自己的MyPartitioner

  1. package com.cknote.hadoop.recordreader;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. public class MyPartitioner extends Partitioner<LongWritable, Text> {
  6.         @Override
  7.         public int getPartition(LongWritable key, Text value, int numPartitions) {
  8.                 if(key.get() % 2 == 0){
  9.                         key.set(1);//如果是偶数行,key都改为1
  10.                         return 1;//分到一个reduce
  11.                 }
  12.                 else{
  13.                         key.set(0);//如果是奇数行,key都改为0
  14.                         return 0;//分到一个reduce
  15.                 }
  16.                        
  17.         }
  18. }
复制代码



3.4编写自己的Map

  1. package com.cknote.hadoop.recordreader;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
  7.         @Override
  8.         protected void map(LongWritable key, Text value,
  9.                         Context context)
  10.                         throws IOException, InterruptedException {
  11.                
  12.                 context.write(key, value);
  13.         }
  14. }
复制代码



3.5编写自己的reduce
  1. package com.cknote.hadoop.recordreader;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. public class MyReducer extends Reducer<LongWritable, Text, Text, IntWritable> {
  8.         @Override
  9.         protected void reduce(LongWritable key, Iterable<Text> values,
  10.                         Context context)
  11.                         throws IOException, InterruptedException {
  12.                 Text keyInfo = new Text();
  13.                 IntWritable valueInfo = new IntWritable();
  14.                
  15.                 int sum = 0;
  16.                 String keyStr = "";
  17.                 for(Text val : values){
  18.                         sum += Integer.parseInt(val.toString());
  19.                         if(keyStr.equals("")){
  20.                                 keyStr = val.toString();       
  21.                         }else{
  22.                                 keyStr = val.toString() + " + " + keyStr;
  23.                         }
  24.                          
  25.                 }
  26.                 valueInfo.set(sum);
  27.                
  28.                 keyStr += " = ";
  29.                 if(key.get() == 0){
  30.                         keyStr = "奇数行之和:" + keyStr;
  31.                 }
  32.                 if(key.get() == 1){
  33.                         keyStr = "偶数行之和:" + keyStr;
  34.                 }
  35.                
  36.                 keyInfo.set(keyStr);
  37.                
  38.                 context.write(keyInfo, valueInfo);
  39.                
  40.         }
  41. }
复制代码





3.5编写主类


  1. package com.cknote.hadoop.recordreader;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.mapreduce.Job;
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  6. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  7. import org.apache.hadoop.util.GenericOptionsParser;
  8. public class TestMain {
  9.         @SuppressWarnings("deprecation")
  10.         public static void main(String[] args) throws Exception {
  11.                 Configuration conf = new Configuration();
  12.                 String[] tmpArgs = {
  13.                                 "hdfs://192.168.137.100:9000/user/hadoop/recordreader/in",
  14.                                 "hdfs://192.168.137.100:9000/user/hadoop/recordreader/out" };
  15.                 String[] otherArgs = new GenericOptionsParser(conf, tmpArgs)
  16.                                 .getRemainingArgs();
  17.                 Job job = new Job(conf, "recordreader");
  18.                 job.setJarByClass(TestMain.class);
  19.                 job.setMapperClass(MyMapper.class);
  20.                 job.setReducerClass(MyReducer.class);
  21.                 job.setPartitionerClass(MyPartitioner.class);
  22.                 job.setInputFormatClass(MyFileInputFormat.class);
  23.                 job.setNumReduceTasks(2);
  24.                
  25.                 //job.setOutputKeyClass(Text.class);
  26.                 //job.setOutputValueClass(IntWritable.class);
  27.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  28.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  29.                 System.out.println(job.waitForCompletion(true));
  30.         }
  31. }
复制代码




4.运行结果:

7.jpg

8.jpg




转自: http://www.jingsizhai.com/

已有(1)人评论

跳转到指定楼层
maizhu 发表于 2014-10-2 00:13:01
帖子不错,值得学习
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条