要求:将下面的单词中用相同字母组成的单词找出。 下图是原始数据:
遇到的问题:运行结果如下图:
正确 的结果应该如下图:
哪位大神能帮小弟看看是哪里的问题代码如下:
package com.hadoop.test;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class A extends Configured implements Tool {
/*
* 排序、分组
*/
public static class AnagramMapper extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value,Context context) throws IOException , InterruptedException {
// 将Text转换成String
String text = value.toString();
// 将String转换成字符数组,为排序作准备
char[] textCharArr = text.toCharArray();
// 使用 Arrays对数组进行排序
Arrays.sort(textCharArr);
// 排序后的字符串
String sortedText = new String(textCharArr);
context.write(new Text(sortedText), value);
}
}
/*
* 统计相同字母组成的单词
*/
public static class AnagramReduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 值
StringBuilder value = new StringBuilder();
// 计数
int count = 0;
// 拼接单词
for(Text val : values){
// 分割符,
if(value.length() > 0){
value.append(",");
}
value.append(val);
// 计数
count++;
}
// 因为我们要统计相同字母组成的单词,所以相同字母组成的单词个数大于等于2才会输出
if(count > 1){
context.write(key, new Text(value.toString()));
}
}
}
public static void main(String[] args) throws Exception {
String[] arg0 = {
"hdfs://dashuju:9000/dashuju/A/AAAA.txt",
"hdfs://dashuju:9000/dashuju/A/out/"
};
// 执行mapperreduce
int status = ToolRunner.run(new Configuration(), new Anagram(), arg0);
System.exit(status);
}
@SuppressWarnings("deprecation")
@Override
public int run(String[] arg0) throws Exception {
// 加载配置
Configuration conf = new Configuration();
// 输出目录,如果存在就删除
Path path = new Path(arg0[1]);
FileSystem fileSystem = path.getFileSystem(conf);
if(fileSystem.isDirectory(path)){
fileSystem.delete(path, true);
}
// 创建job对象
Job job = new Job(conf,"anagram");
job.setJarByClass(Anagram.class);
// 指定输入、输出目录
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
// 指定mapper、reduce
job.setMapperClass(AnagramMapper.class);
job.setReducerClass(AnagramReduce.class);
// 指定mapper、reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 提交作业
return job.waitForCompletion(true) ? 0: 1;
}
}
|
|