nettman 发表于 2014-1-13 17:34:44

mapreduce完成字符合并代码分享

现在有这样两个文件
a|b
a|c|d
通过mapreduce处理成
a|b|c|import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class plumebobo0001 {

      public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
                Text myKey = new Text();
                Text myValue = new Text();

                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                        if (!value.toString().contains("|"))
                              return;
                        String[] myStr = value.toString().split("\\|");
                        for (int i = 1; i < myStr.length; i++) {
                              myKey.set(myStr + "|" + myStr); //把数据放在Key中输出,value空
                              context.write(myKey, myValue);
                        }
                        
                }
      }

      public static class MyReducer extends Reducer<Text, Text, Text, NullWritable>
      {
                Text myKey = new Text();
                NullWritable myValue;

                public void reduce(Text key, Iterable<Text> values, Context context)
                              throws IOException, InterruptedException {

                        StringBuilder myStr = new StringBuilder("");
                        
                        //迭代取出Key中的数据
                        //重写了grouping,所以这里不用再作二次排序
                        for (Text val : values) {
                              if (myStr.length() == 0) {
                                        myStr.append(key.toString());
                              }
                              else {
                                        myStr.append("|");
                                        myStr.append(key.toString().split("\\|"));
                              }
                        }
                        myKey.set(myStr.toString());
                        context.write(myKey, myValue);
                }
      }

      public static void main(String[] args) throws Exception {
                String Oarg[] = new String;
                Oarg = "/tmp/plumebobo/test0001";
                Oarg = "/tmp/plumebobo/out001";
                Configuration conf = new Configuration();
                conf.set("mapred.job.tracker", "m04.ct1.r01.hdp:9001");
                Job job = new Job(conf, "plumebobo0001");

                job.setJarByClass(plumebobo0001.class);
                job.setMapperClass(MyMapper.class);
                job.setReducerClass(MyReducer.class);
                job.setNumReduceTasks(1);
                job.setOutputFormatClass(TextOutputFormat.class);

                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(NullWritable.class);

                job.setPartitionerClass(MyPartitioner.class);
                job.setGroupingComparatorClass(MyGroupingComparator.class);

                FileInputFormat.setInputPaths(job, new Path(Oarg));
                FileOutputFormat.setOutputPath(job, new Path(Oarg));
                job.waitForCompletion(true);

      }
}

// 根据第一列 分区
class MyPartitioner extends HashPartitioner<Text, Text>
{
      @Override
      public int getPartition(Text key, Text value, int numPartitions) {
                Text cols = new Text(key.toString().split("\\|"));
                return super.getPartition(cols, value, numPartitions);// cols
      }
}

// 以第一列 值 分组
class MyGroupingComparator implements RawComparator<Text>
{

      // @Override
      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                Text key1 = new Text();
                Text key2 = new Text();

                DataInputBuffer buffer = new DataInputBuffer();
                try {
                        buffer.reset(b1, s1, l1);
                        key1.readFields(buffer);
                        buffer.reset(b2, s2, l2);
                        key2.readFields(buffer);
                }
                catch (IOException e) {
                        throw new RuntimeException(e);
                }

                String str1 = key1.toString().split("\\|");
                String str2 = key2.toString().split("\\|");
                return str1.compareTo(str2);
      }

      public int compare(Text o1, Text o2) {
                return 0;
      }
}
页: [1]
查看完整版本: mapreduce完成字符合并代码分享