mapreduce完成字符合并代码分享
现在有这样两个文件a|b
a|c|d
通过mapreduce处理成
a|b|c|import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class plumebobo0001 {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
Text myKey = new Text();
Text myValue = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (!value.toString().contains("|"))
return;
String[] myStr = value.toString().split("\\|");
for (int i = 1; i < myStr.length; i++) {
myKey.set(myStr + "|" + myStr); //把数据放在Key中输出,value空
context.write(myKey, myValue);
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, NullWritable>
{
Text myKey = new Text();
NullWritable myValue;
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder myStr = new StringBuilder("");
//迭代取出Key中的数据
//重写了grouping,所以这里不用再作二次排序
for (Text val : values) {
if (myStr.length() == 0) {
myStr.append(key.toString());
}
else {
myStr.append("|");
myStr.append(key.toString().split("\\|"));
}
}
myKey.set(myStr.toString());
context.write(myKey, myValue);
}
}
public static void main(String[] args) throws Exception {
String Oarg[] = new String;
Oarg = "/tmp/plumebobo/test0001";
Oarg = "/tmp/plumebobo/out001";
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "m04.ct1.r01.hdp:9001");
Job job = new Job(conf, "plumebobo0001");
job.setJarByClass(plumebobo0001.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(MyPartitioner.class);
job.setGroupingComparatorClass(MyGroupingComparator.class);
FileInputFormat.setInputPaths(job, new Path(Oarg));
FileOutputFormat.setOutputPath(job, new Path(Oarg));
job.waitForCompletion(true);
}
}
// 根据第一列 分区
class MyPartitioner extends HashPartitioner<Text, Text>
{
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Text cols = new Text(key.toString().split("\\|"));
return super.getPartition(cols, value, numPartitions);// cols
}
}
// 以第一列 值 分组
class MyGroupingComparator implements RawComparator<Text>
{
// @Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Text key1 = new Text();
Text key2 = new Text();
DataInputBuffer buffer = new DataInputBuffer();
try {
buffer.reset(b1, s1, l1);
key1.readFields(buffer);
buffer.reset(b2, s2, l2);
key2.readFields(buffer);
}
catch (IOException e) {
throw new RuntimeException(e);
}
String str1 = key1.toString().split("\\|");
String str2 = key2.toString().split("\\|");
return str1.compareTo(str2);
}
public int compare(Text o1, Text o2) {
return 0;
}
}
页:
[1]