现在有这样两个文件
a|b
a|c|d
通过mapreduce处理成
a|b|c|
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DataInputBuffer;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.RawComparator;
- import org.apache.hadoop.io.Text;
-
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
-
- public class plumebobo0001 {
-
- public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
- Text myKey = new Text();
- Text myValue = new Text();
-
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- if (!value.toString().contains("|"))
- return;
- String[] myStr = value.toString().split("\\|");
- for (int i = 1; i < myStr.length; i++) {
- myKey.set(myStr[0] + "|" + myStr[i]); //把数据放在Key中输出,value空
- context.write(myKey, myValue);
- }
-
- }
- }
-
- public static class MyReducer extends Reducer<Text, Text, Text, NullWritable>
- {
- Text myKey = new Text();
- NullWritable myValue;
-
- public void reduce(Text key, Iterable<Text> values, Context context)
- throws IOException, InterruptedException {
-
- StringBuilder myStr = new StringBuilder("");
-
- //迭代取出Key中的数据
- //重写了grouping,所以这里不用再作二次排序
- for (Text val : values) {
- if (myStr.length() == 0) {
- myStr.append(key.toString());
- }
- else {
- myStr.append("|");
- myStr.append(key.toString().split("\\|")[1]);
- }
- }
- myKey.set(myStr.toString());
- context.write(myKey, myValue);
- }
- }
-
- public static void main(String[] args) throws Exception {
- String Oarg[] = new String[2];
- Oarg[0] = "/tmp/plumebobo/test0001";
- Oarg[1] = "/tmp/plumebobo/out001";
- Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "m04.ct1.r01.hdp:9001");
- Job job = new Job(conf, "plumebobo0001");
-
- job.setJarByClass(plumebobo0001.class);
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(TextOutputFormat.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
-
- job.setPartitionerClass(MyPartitioner.class);
- job.setGroupingComparatorClass(MyGroupingComparator.class);
-
- FileInputFormat.setInputPaths(job, new Path(Oarg[0]));
- FileOutputFormat.setOutputPath(job, new Path(Oarg[1]));
- job.waitForCompletion(true);
-
- }
- }
-
- // 根据第一列 分区
- class MyPartitioner extends HashPartitioner<Text, Text>
- {
- @Override
- public int getPartition(Text key, Text value, int numPartitions) {
- Text cols = new Text(key.toString().split("\\|")[0]);
- return super.getPartition(cols, value, numPartitions);// cols[0]
- }
- }
-
- // 以第一列 值 分组
- class MyGroupingComparator implements RawComparator<Text>
- {
-
- // @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- Text key1 = new Text();
- Text key2 = new Text();
-
- DataInputBuffer buffer = new DataInputBuffer();
- try {
- buffer.reset(b1, s1, l1);
- key1.readFields(buffer);
- buffer.reset(b2, s2, l2);
- key2.readFields(buffer);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- String str1 = key1.toString().split("\\|")[0];
- String str2 = key2.toString().split("\\|")[0];
- return str1.compareTo(str2);
- }
-
- public int compare(Text o1, Text o2) {
- return 0;
- }
- }
复制代码
|