Hadoop平台没有提供全局数据排序,而在大规模数据处理中进行数据的全局排序是非常普遍的需求。本文首先实现简单的全局排序,然后再使用Hadoop平台提供的采样器解决数据倾斜的问题。 0.环境: hadoop2.4.0 四节点:master(192.168.137.100) slave1(192.168.137.101) slave2(192.168.137.102) slave3(192.168.137.103) window8下通过eclipse的插件远程提交job到hadoop环境
1.描述: 1.1数据输入:
- 5
- 7
- 6
- 9
- 1
- 11
- 79
- 36
- 55
- 66
- 77
- 34
- 29
- 99
- 101
- 202
- 987
- 123
- 456
- 714
- 562
- 234
- 1234
- 611
- 3
- 97
- 82
复制代码
1.2预计输出:
全局排序,见运行截图
2.设计思路:
我们可以选择reduce作为节点来分割数据,例如把小于100的让一个ruduce处理,大于100小于1000的第二个reduce处理。。。,每个reduce内部都是排好序的,就实现了全局排序;但是这样硬分割的后果是会产生数据倾斜,分布不均匀,可能会大部分的数据都在某个reduce上,或者某个reduce上的数据非常稀疏。hadoop平台给我们提供了采样器,可以解决这个问题,我们使用简单的随机采样。
3.代码实现:
3.1编写mapper
- package com.cknote.hadoop.globlesort;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
-
- public class MyMapper extends
- Mapper<LongWritable, Text, LongWritable, NullWritable> {
-
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
-
- context.write(new LongWritable(Integer.parseInt(value.toString())),
- NullWritable.get());
-
- }
-
- }
复制代码
3.2编写reducer
- package com.cknote.hadoop.globlesort;
-
- import java.io.IOException;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Reducer;
-
- public class MyReducer extends
- Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
-
- @Override
- protected void reduce(LongWritable key, Iterable<NullWritable> value,
- Context context) throws IOException, InterruptedException {
-
- context.write(key, NullWritable.get());
-
- }
-
- }
复制代码
3.3编写分区
- package com.cknote.hadoop.globlesort;
-
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Partitioner;
-
- public class MyPartitioner extends Partitioner<LongWritable, NullWritable> {
-
- @Override
- public int getPartition(LongWritable key, NullWritable value,
- int numPartitions) {
- long tmp = key.get();
- if (tmp <= 100) {
- return 0 % numPartitions;
- } else if (tmp <= 1000) {
- return 1 % numPartitions;
- } else {
- return 2 % numPartitions;
- }
- }
-
- }
复制代码
3.4编写主类 - package com.cknote.hadoop.globlesort;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
-
- public class GlobleSortMain1 {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] tmpArgs = {
- "hdfs://192.168.137.100:9000/user/hadoop/globlesort/in",
- "hdfs://192.168.137.100:9000/user/hadoop/globlesort/out" };
-
- String[] otherArgs = null;
- if (args.length != 3) {
- otherArgs = new GenericOptionsParser(conf, tmpArgs)
- .getRemainingArgs();
- } else {
- otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- }
-
- Job job = new Job(conf, "globle sort 1");
-
- job.setJarByClass(GlobleSortMain1.class);
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
- job.setPartitionerClass(MyPartitioner.class);
-
- job.setNumReduceTasks(3);
-
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(NullWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- }
- }
复制代码
4.运行结果:
5.代码实现 5.1编写主类 mapper和reducer无须重新编写
- package com.cknote.hadoop.globlesort;
-
- import java.net.URI;
- import java.util.Date;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.filecache.DistributedCache;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
- import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
- import org.apache.hadoop.util.GenericOptionsParser;
-
- public class GlobleSortMain2 {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] tmpArgs = {
- "hdfs://192.168.137.100:9000/user/hadoop/globlesort/in",
- "hdfs://192.168.137.100:9000/user/hadoop/globlesort/out2" };
-
- String[] otherArgs = null;
- if (args.length != 2) {
- otherArgs = new GenericOptionsParser(conf, tmpArgs)
- .getRemainingArgs();
- } else {
- otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- }
-
- Job job = new Job(conf, "globle sort 2");
-
- job.setJarByClass(GlobleSortMain2.class);
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
-
- job.setNumReduceTasks(3);
-
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(NullWritable.class);
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(NullWritable.class);
-
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
- job.setPartitionerClass(TotalOrderPartitioner.class);
- InputSampler.RandomSampler<LongWritable, NullWritable> sampler
- = new InputSampler.RandomSampler<LongWritable, NullWritable>(
- 0.1, 100, 3);
-
- Path partitionFile = new Path(otherArgs[0], "_sortpartitions");
-
- TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
-
- InputSampler.writePartitionFile(job, sampler);
-
- // 一般都将该文件做distribute cache处理
- URI partitionURI = new URI(partitionFile.toString()
- + "#_sortpartitions");
- // 从上面可以看出 采样器是在map阶段之前进行的
- // 在提交job的client端完成的
- DistributedCache.addCacheFile(partitionURI, conf);
- DistributedCache.createSymlink(conf);
-
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took "
- + (end_time.getTime() - startTime.getTime()) / 1000
- + " seconds.");
-
- }
- }
复制代码
6运行结果
|