- public static class AdjGraphMapper extends Mapper<Object, Text, Text, Text> {
- @Override
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String[] arr = value.toString().split("\t");
- if(arr[0].trim().length() == 11){
- if(arr[1].trim().length() == 11){
- context.write(new Text(arr[0]), new Text(arr[1]));
- }else{
- context.write(new Text(arr[0]),new Text());
- }
- }
- }
- }
-
- public static class AdjGraphReducer extends Reducer<Text,Text,Text,Text> {
- @Override
- public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
- HashSet<String> toIdSet = new HashSet<String>();
- Text result = new Text();
- for (Text val : values) {
- toIdSet.add(val.toString());
- }
- result.set(toIdSet.toString().replaceAll("[\\[\\]]", "").replaceAll(", ", "\t"));
- context.write(key, result);
- System.out.println(key+","+result);
- }
- }
复制代码
- public static void main(String[] args) throws Exception {
- String input = "hdfs://192.168.0.106:9000/user/wsc/data/SM20140623.txt";
- String output = "hdfs://192.168.0.106:9000/user/wsc/data/result2";
-
- Configuration conf = new Configuration();
- Job job = new Job(conf, "adj graph");
- job.setJarByClass(AdjGraph.class);
- job.setMapperClass(AdjGraphMapper.class);
- job.setCombinerClass(AdjGraphReducer.class);
- job.setReducerClass(AdjGraphReducer.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(input));
- FileOutputFormat.setOutputPath(job, new Path(output));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
复制代码
|