本帖最后由 pig2 于 2016-6-23 14:32 编辑
问题导读:
1. Hadoop默认的HashPartitioner是如何进行分区的?
2. 如何自定义HashPartitioner?
3. 如何自定义Partitioner?
4. 其他的Partitioner有哪些?
概述
Partitioner 组件可以让 Map 对 Key 进行分区,从而将不同分区的 Key 交由不同的 Reduce 处理。如果这么说让你觉得有一些笼统的话,那么本文可能很适合你,因为本文要依据某一个实例进行说明。
需求场景假设我们现在要统计各个省份的男女人数,每个省份的数据单独保存。而我们的原始数据是这样的: [mw_shl_code=applescript,true]Fern girl guangdong
Alice girl jiangsu
Bunny girl shanghai
Amy girl xian
Walker boy guangdong
Ingram boy shichuang
Paul boy shichuang
Caroline girl jiangsu
Esther girl jiangsu
Eve girl tianjing[/mw_shl_code]
第一个字段为名字,第二个字段为性别,第三个字段为省份。这是其中一个文件中的内容,全部的文件列表如下:
Partitioner 组件这里我们不讨论 MapReduce 过程,通过前面的学习,这一点我相信你还是能够应付的。
HashPartitioner在一般的 MapReduce 过程中,我们知道可以通过 job.setNumReduceTasks(N) 来创建多个 Reducer 处理结果。可是,这种情况下,系统会调用默认的 Partitioner 也就是 HashPartitioner 来对 Map 的 key 进行分区。下面是 HashPartitioner 的源码。 [mw_shl_code=java,true]public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
[/mw_shl_code]
这里可以看到默认的 HashPartitioner 对分区的处理过程。可是,这种处理有一个最大的问题,我们的分区中可能会有很多不同的 key 被分到了同一个分区中了。因为这里是对 numReduceTasks 进行取余,你的 hashCode 相差再大也于事无补。
自定义 HashPartitioner(Hash)上面默认的 HashPartitioner 解决起来会有一些问题,所以这里我们就需要自定义 Partitioner 组件。下面是我第一次进行自定义的 Partitioner 组件,也是用到了一个 hashCode()。 [mw_shl_code=java,true]
public static class WordcountHashPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String location = key.toString().split(":")[0];
return Math.abs(location.hashCode() * 127) % numPartitions;
}
}[/mw_shl_code]
运行 Hadoop 程序,不出所料也出现相同的问题:
自定义 HashPartitioner(非 Hash)从上图的结果中可以看到各个文件的内容相差还是挺大的,尤其是其中还有一些文件没有内容。这正是由于有一部分 key 被分到其他 key 的分区里面去了。于是,我把代码修改了,如下( 如果你不喜欢使用 switch,可以使用一些重构手法 ): [mw_shl_code=java,true]public static class WordcountHashPartitionerNew extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String location = key.toString().split(":")[0];
switch (location) {
case "anhui":
return 0;
case "beijing":
return 1;
( ... 此处省略 N 行 ...)
case "zhejiang":
return 16;
}
return 0;
}
}
[/mw_shl_code]
修改之后,再来看执行结果就正常多了。
客户端调用
[mw_shl_code=java,true]public class PartitionerClient {
( ... 此处省略 N 行 ...)
public static void main(String[] args) throws Exception {
PartitionerClient client = new PartitionerClient();
client.execute(args);
}
private void execute(String[] args) throws Exception {
( ... 此处省略 N 行 ...)
runWordCountJob(inputPath, outputPath);
}
private int runWordCountJob(String inputPath, String outputPath) throws Exception {
( ... 此处省略 N 行 ...)
job.setMapperClass(CorePartitioner.CoreMapper.class);
job.setCombinerClass(CorePartitioner.CoreReducer.class);
job.setPartitionerClass(CorePartitioner.WordcountHashPartitionerNew.class);
job.setNumReduceTasks(17);
job.setReducerClass(CorePartitioner.CoreReducer.class);
( ... 此处省略 N 行 ...)
}
}[/mw_shl_code]
这里调用的方式也就两句话:
[mw_shl_code=java,true]
job.setPartitionerClass(CorePartitioner.WordcountHashPartitionerNew.class);
job.setNumReduceTasks(17);[/mw_shl_code]
前一句没什么好说的,与 job.setMapperClass(CorePartitioner.CoreMapper.class) 都是类似的。关于后一句,也就是设置 ReduceTasks 的个数。这个值会传递给 getPartition() 的 numPartitions 参数。
其他 Partitioner查看 Partitioner 的 API 可以看到 Partitioner 的 4 个实现类: [mw_shl_code=java,true]
BinaryPartitioner, HashPartitioner, KeyFieldBasedPartitioner, TotalOrderPartitioner[/mw_shl_code] - BinaryPartitioner
- HashPartitioner
- KeyFieldBasedPartitioner
- TotalOrderPartitioner
版权说明
著作权归作者所有。
商业转载请联系作者获得授权,非商业转载请注明出处。
本文作者:Q-WHai
发表日期: 2016年6月21日
本文链接:http://blog.csdn.net/lemon_tree12138/article/details/51730960
来源:CSDN
|