默认的TextInputFormat是按行输入,key为每行的起始偏移量,value为行的内容。这里实现key为行号,而不是偏移量。
0.环境:
hadoop2.4.0 四节点:master(192.168.137.100) slave1(192.168.137.101) slave2(192.168.137.102) slave3(192.168.137.103) window8下通过eclipse的插件远程提交job到hadoop环境
1.描述:
要求对数据按奇数行和偶数行分别求和 输入数据:
复制代码
预期输出:
- 奇数行之和:10 + 30 + 50 + 70 = 160
-
- 偶数行之和:20 + 40 + 60 = 120
复制代码
2.设计思路: 为了实现奇数行和偶数行分别求和,我们首先需要取得行号。这里我们设计key为行号,默认的TextInputFormat的RecordReader为LineRecordReader,取出来的key为偏移量,所以我们要自定义自己的InputFormat和RecordReader。另外我们还要分别把奇数行和偶数行发送到同一个reduce(应该是reduce拉取数据,为了好理解),我们需要自定义我们自己的分区MyPartitioner。
3.代码实现:
3.1首先我们仿照LineRecordReader编写我们自己的MyRecordReader
- package com.cknote.hadoop.recordreader;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.util.LineReader;
-
- public class MyRecordReader extends RecordReader<LongWritable, Text> {
-
- private long start;//分片开始偏移量
- private long end;//分片结束偏移量
- private long pos;//行号
- private FSDataInputStream fin = null;
- private LongWritable key = null;
- private Text value = null;
- private LineReader reader = null;
-
- /**
- * 重要的方法
- */
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- FileSplit fileSplit = (FileSplit)inputSplit;
-
- start = fileSplit.getStart();
- end = start + fileSplit.getLength();
-
- Configuration conf = context.getConfiguration();
- Path path = fileSplit.getPath();
- FileSystem fs = path.getFileSystem(conf);
- fin = fs.open(path);
- fin.seek(start);
- reader = new LineReader(fin);
- pos = 1;
-
- }
-
- /**
- * 重要的方法
- */
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if(key == null){
- key = new LongWritable();
- }
- key.set(pos);
- if(value == null){
- value = new Text();
- }
-
- if(reader.readLine(value) == 0){
- return false;
- }
- pos++;
- return true;
- }
-
- @Override
- public LongWritable getCurrentKey() throws IOException,
- InterruptedException {
- return key;
- }
-
- @Override
- public Text getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
-
- return 0;
-
- }
-
- @Override
- public void close() throws IOException {
- fin.close();
- }
-
- }
复制代码
3.2根据TextInputFormat实现自己的MyFileInputFormat 并且重载isSplitable方法 - package com.cknote.hadoop.recordreader;
-
- import java.io.IOException;
-
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
- public class MyFileInputFormat extends FileInputFormat<LongWritable, Text> {
-
- @Override
- public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
- TaskAttemptContext arg1) throws IOException, InterruptedException {
- return new MyRecordReader();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- // 文件不分割
- return false;
- }
-
- }
复制代码
3.3参考HashPartition编写自己的MyPartitioner
- package com.cknote.hadoop.recordreader;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Partitioner;
-
- public class MyPartitioner extends Partitioner<LongWritable, Text> {
-
- @Override
- public int getPartition(LongWritable key, Text value, int numPartitions) {
- if(key.get() % 2 == 0){
- key.set(1);//如果是偶数行,key都改为1
- return 1;//分到一个reduce
- }
- else{
- key.set(0);//如果是奇数行,key都改为0
- return 0;//分到一个reduce
- }
-
- }
-
- }
复制代码
3.4编写自己的Map
- package com.cknote.hadoop.recordreader;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
-
- @Override
- protected void map(LongWritable key, Text value,
- Context context)
- throws IOException, InterruptedException {
-
- context.write(key, value);
- }
-
- }
复制代码
3.5编写自己的reduce - package com.cknote.hadoop.recordreader;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
-
- public class MyReducer extends Reducer<LongWritable, Text, Text, IntWritable> {
-
- @Override
- protected void reduce(LongWritable key, Iterable<Text> values,
- Context context)
- throws IOException, InterruptedException {
-
- Text keyInfo = new Text();
- IntWritable valueInfo = new IntWritable();
-
- int sum = 0;
- String keyStr = "";
- for(Text val : values){
- sum += Integer.parseInt(val.toString());
- if(keyStr.equals("")){
- keyStr = val.toString();
- }else{
- keyStr = val.toString() + " + " + keyStr;
- }
-
- }
- valueInfo.set(sum);
-
- keyStr += " = ";
- if(key.get() == 0){
- keyStr = "奇数行之和:" + keyStr;
- }
- if(key.get() == 1){
- keyStr = "偶数行之和:" + keyStr;
- }
-
- keyInfo.set(keyStr);
-
- context.write(keyInfo, valueInfo);
-
- }
-
- }
复制代码
3.5编写主类
- package com.cknote.hadoop.recordreader;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
-
- public class TestMain {
-
- @SuppressWarnings("deprecation")
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- String[] tmpArgs = {
- "hdfs://192.168.137.100:9000/user/hadoop/recordreader/in",
- "hdfs://192.168.137.100:9000/user/hadoop/recordreader/out" };
- String[] otherArgs = new GenericOptionsParser(conf, tmpArgs)
- .getRemainingArgs();
-
- Job job = new Job(conf, "recordreader");
- job.setJarByClass(TestMain.class);
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
- job.setPartitionerClass(MyPartitioner.class);
- job.setInputFormatClass(MyFileInputFormat.class);
- job.setNumReduceTasks(2);
-
- //job.setOutputKeyClass(Text.class);
- //job.setOutputValueClass(IntWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
- System.out.println(job.waitForCompletion(true));
- }
-
- }
复制代码
4.运行结果:
转自: http://www.jingsizhai.com/
|