分享

hadoop mapreduce 统计次数及排序使用两个mapreduce

lzw 发表于 2013-12-19 23:02:30 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13386
本帖最后由 pig2 于 2014-2-14 23:53 编辑

下面的例子是统计密码次数,及其密码使用次数排序,其中使用两个mapreduce:
  1. import java.io.IOException;
  2. import java.util.Random;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.io.WritableComparable;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;
  15. public class CountPwd{
  16.   public static class CountMapper extends Mapper<Object, Text, Text, IntWritable>{
  17.     private final static IntWritable one = new IntWritable(1);
  18.     private Text password = new Text();
  19.       
  20.     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  21.         String eachline=value.toString();
  22.         String [] eachterm=eachline.split("#");
  23.         password.set(eachterm[1]);
  24.         context.write(password, one);
  25.     }
  26.   }
  27.   
  28.   public static class CountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
  29.     private IntWritable total = new IntWritable();
  30.     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  31.       
  32.       int sum = 0;
  33.       for (IntWritable val : values) {
  34.         sum += val.get();
  35.       }
  36.       total.set(sum);
  37.       context.write(key,total);
  38.     }
  39.   }
  40.   public static class SortMapper extends Mapper<Object, Text, IntWritable,Text>{
  41.             
  42.             public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  43.               
  44.                     IntWritable times = new IntWritable(1);
  45.                      Text password = new Text();
  46.                     String eachline=value.toString();
  47.                     String[] eachterm =eachline.split("        ");
  48.                     
  49.                     if(eachterm.length==2){
  50.                             password.set(eachterm[0]);
  51.                             times.set(Integer.parseInt(eachterm[1]));
  52.                             context.write(times,password);
  53.                     }else{
  54.                             password.set("errorpassword");
  55.                             context.write(times,password);
  56.                     }
  57.             }
  58.           }
  59.          
  60.           public static class SortReducer extends Reducer<IntWritable,Text,IntWritable,Text> {
  61.                   private Text password = new Text();
  62.                   public void reduce(IntWritable key,Iterable<Text> values, Context context) throws IOException, InterruptedException {
  63.                     //不同的密码可能出现相同的次数
  64.                     for (Text val : values) {
  65.                         password.set(val);
  66.                         context.write(key,password);
  67.                     }
  68.             }
  69.         }
  70.   private static class IntDecreasingComparator extends IntWritable.Comparator {
  71.       public int compare(WritableComparable a, WritableComparable b) {
  72.         return -super.compare(a, b);
  73.       }
  74.       
  75.       public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  76.           return -super.compare(b1, s1, l1, b2, s2, l2);
  77.       }
  78.   }
  79.   
  80.   public static void main(String[] args) throws Exception {
  81.          
  82.             Configuration conf = new Configuration();
  83.             Job job = new Job(conf, "Counter pwd");
  84.             job.setJarByClass(CountPwd.class);
  85.             job.setMapperClass(CountMapper.class);
  86.             job.setCombinerClass(CountReducer.class);
  87.             job.setReducerClass(CountReducer.class);
  88.             
  89.             job.setOutputKeyClass(Text.class);
  90.             job.setOutputValueClass(IntWritable.class);
  91.             
  92.             //定义一个临时目录,先将词频统计任务的输出结果写到临时目录中, 下一个排序任务以临时目录为输入目录。
  93.             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  94.             Path tempDir = new Path("CountPwd-tmp");
  95.             FileOutputFormat.setOutputPath(job, tempDir);
  96.             
  97.             if(job.waitForCompletion(true))
  98.                 {
  99.                         Job sortJob = new Job(conf, "pwd by sort");
  100.                         sortJob.setJarByClass(CountPwd.class);
  101.                         
  102.                         FileInputFormat.addInputPath(sortJob, tempDir);
  103.                         
  104.                         sortJob.setMapperClass(SortMapper.class);
  105.                         FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));
  106.             
  107.                         sortJob.setOutputKeyClass(IntWritable.class);
  108.                         sortJob.setOutputValueClass(Text.class);
  109.                         
  110.                         sortJob.setSortComparatorClass(IntDecreasingComparator.class);
  111.                         FileSystem.get(conf).deleteOnExit(tempDir);
  112.             
  113.                         System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
  114.                 }
  115.             
  116.             System.exit(job.waitForCompletion(true) ? 0 : 1);
  117.   }
  118. }
复制代码
欢迎加入about云群9037177932227315139327136 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条