分享

【转】【mapreduce进阶编程五】全局排序

cknote 发表于 2014-9-30 11:18:06 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 21315


Hadoop平台没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求。本文首先实现简单的全局排序,然后再使用Hadoop平台提供的采样器解决数据倾斜的问题。
0.环境:
   hadoop2.4.0 四节点:master(192.168.137.100) slave1(192.168.137.101) slave2(192.168.137.102) slave3(192.168.137.103)
window8下通过eclipse的插件远程提交job到hadoop环境

1.描述:
1.1数据输入:

  1. 5
  2. 7
  3. 6
  4. 9
  5. 1
  6. 11
  7. 79
  8. 36
  9. 55
  10. 66
  11. 77
  12. 34
  13. 29
  14. 99
  15. 101
  16. 202
  17. 987
  18. 123
  19. 456
  20. 714
  21. 562
  22. 234
  23. 1234
  24. 611
  25. 3
  26. 97
  27. 82
复制代码


1.2预计输出:

全局排序,见运行截图

2.设计思路:

   我们可以选择reduce作为节点来分割数据,例如把小于100的让一个ruduce处理,大于100小于1000的第二个reduce处理。。。,每个reduce内部都是排好序的,就实现了全局排序;但是这样硬分割的后果是会产生数据倾斜,分布不均匀,可能会大部分的数据都在某个reduce上,或者某个reduce上的数据非常稀疏。hadoop平台给我们提供了采样器,可以解决这个问题,我们使用简单的随机采样。

3.代码实现:
3.1编写mapper

  1. package com.cknote.hadoop.globlesort;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. public class MyMapper extends
  8.                 Mapper<LongWritable, Text, LongWritable, NullWritable> {
  9.         @Override
  10.         protected void map(LongWritable key, Text value, Context context)
  11.                         throws IOException, InterruptedException {
  12.                 context.write(new LongWritable(Integer.parseInt(value.toString())),
  13.                                 NullWritable.get());
  14.         }
  15. }
复制代码


3.2编写reducer


  1. package com.cknote.hadoop.globlesort;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. public class MyReducer extends
  7.                 Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
  8.         @Override
  9.         protected void reduce(LongWritable key, Iterable<NullWritable> value,
  10.                         Context context) throws IOException, InterruptedException {
  11.                 context.write(key, NullWritable.get());
  12.         }
  13. }
复制代码




3.3编写分区

  1. package com.cknote.hadoop.globlesort;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. public class MyPartitioner extends Partitioner<LongWritable, NullWritable> {
  6.         @Override
  7.         public int getPartition(LongWritable key, NullWritable value,
  8.                         int numPartitions) {
  9.                 long tmp = key.get();
  10.                 if (tmp <= 100) {
  11.                         return 0 % numPartitions;
  12.                 } else if (tmp <= 1000) {
  13.                         return 1 % numPartitions;
  14.                 } else {
  15.                         return 2 % numPartitions;
  16.                 }
  17.         }
  18. }
复制代码





3.4编写主类
  1. package com.cknote.hadoop.globlesort;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import org.apache.hadoop.util.GenericOptionsParser;
  10. public class GlobleSortMain1 {
  11.         public static void main(String[] args) throws Exception {
  12.                 Configuration conf = new Configuration();
  13.                 String[] tmpArgs = {
  14.                                 "hdfs://192.168.137.100:9000/user/hadoop/globlesort/in",
  15.                                 "hdfs://192.168.137.100:9000/user/hadoop/globlesort/out" };
  16.                 String[] otherArgs = null;
  17.                 if (args.length != 3) {
  18.                         otherArgs = new GenericOptionsParser(conf, tmpArgs)
  19.                                         .getRemainingArgs();
  20.                 } else {
  21.                         otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  22.                 }
  23.                 Job job = new Job(conf, "globle sort 1");
  24.                 job.setJarByClass(GlobleSortMain1.class);
  25.                 job.setMapperClass(MyMapper.class);
  26.                 job.setReducerClass(MyReducer.class);
  27.                 job.setPartitionerClass(MyPartitioner.class);
  28.                 job.setNumReduceTasks(3);
  29.                 job.setMapOutputKeyClass(LongWritable.class);
  30.                 job.setMapOutputValueClass(NullWritable.class);
  31.                 job.setOutputKeyClass(LongWritable.class);
  32.                 job.setOutputValueClass(NullWritable.class);
  33.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  34.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  35.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  36.         }
  37. }
复制代码



4.运行结果:
1.jpg

2.jpg

3.jpg

5.代码实现
5.1编写主类
   mapper和reducer无须重新编写

  1. package com.cknote.hadoop.globlesort;
  2. import java.net.URI;
  3. import java.util.Date;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.filecache.DistributedCache;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
  14. import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
  15. import org.apache.hadoop.util.GenericOptionsParser;
  16. public class GlobleSortMain2 {
  17.         public static void main(String[] args) throws Exception {
  18.                 Configuration conf = new Configuration();
  19.                 String[] tmpArgs = {
  20.                                 "hdfs://192.168.137.100:9000/user/hadoop/globlesort/in",
  21.                                 "hdfs://192.168.137.100:9000/user/hadoop/globlesort/out2" };
  22.                 String[] otherArgs = null;
  23.                 if (args.length != 2) {
  24.                         otherArgs = new GenericOptionsParser(conf, tmpArgs)
  25.                                         .getRemainingArgs();
  26.                 } else {
  27.                         otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  28.                 }
  29.                 Job job = new Job(conf, "globle sort 2");
  30.                 job.setJarByClass(GlobleSortMain2.class);
  31.                 job.setMapperClass(MyMapper.class);
  32.                 job.setReducerClass(MyReducer.class);
  33.                 job.setNumReduceTasks(3);
  34.                 job.setMapOutputKeyClass(LongWritable.class);
  35.                 job.setMapOutputValueClass(NullWritable.class);
  36.                 job.setOutputKeyClass(LongWritable.class);
  37.                 job.setOutputValueClass(NullWritable.class);
  38.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  39.                 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  40.                 job.setPartitionerClass(TotalOrderPartitioner.class);
  41.                 InputSampler.RandomSampler<LongWritable, NullWritable> sampler
  42.                 = new InputSampler.RandomSampler<LongWritable, NullWritable>(
  43.                                 0.1, 100, 3);
  44.                 Path partitionFile = new Path(otherArgs[0], "_sortpartitions");
  45.                 TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
  46.                 InputSampler.writePartitionFile(job, sampler);
  47.                 // 一般都将该文件做distribute cache处理
  48.                 URI partitionURI = new URI(partitionFile.toString()
  49.                                 + "#_sortpartitions");
  50.                 // 从上面可以看出 采样器是在map阶段之前进行的
  51.                 // 在提交job的client端完成的
  52.                 DistributedCache.addCacheFile(partitionURI, conf);
  53.                 DistributedCache.createSymlink(conf);
  54.                 Date startTime = new Date();
  55.                 System.out.println("Job started: " + startTime);
  56.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  57.                 Date end_time = new Date();
  58.                 System.out.println("Job ended: " + end_time);
  59.                 System.out.println("The job took "
  60.                                 + (end_time.getTime() - startTime.getTime()) / 1000
  61.                                 + " seconds.");
  62.         }
  63. }
复制代码





6运行结果

4.jpg
5.jpg

6.jpg





已有(3)人评论

跳转到指定楼层
sun1 发表于 2015-4-22 07:49:09
感谢分享,学习了。
回复

使用道具 举报

admln 发表于 2015-9-23 15:51:39
The type DistributedCache is deprecated
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条