分享

Mapreduce分区Partition解析

nettman 发表于 2014-6-25 19:20:56 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 23621

问题导读:
Partition的作用是什么?
Mapreduce默认的partitioner是什么?还有其它partitioner是什么?
patition类结构都包含什么?
Mapreduce全局排序会用到那个类?









partition所在位置.JPG


Partition位置

Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:

1)均衡负载,尽量的将工作均匀的分配给不同的reduce。

2)效率,分配速度一定要快。



Mapreduce提供的Partitioner

Mapreduce默认的partitioner是HashPartitioner。除了这个mapreduce还提供了3种partitioner。如下图所示:

patition类结构.jpg


patition类结构

1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

2.HashPartitioner是mapreduce的默认partitioner。计算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

3.BinaryPatitioner继承于Partitioner< BinaryComparable ,V>,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。

Which reducer=(hash & Integer.MAX_VALUE) % numReduceTasks

4.KeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。

5.TotalOrderPartitioner这个类可以实现输出的全排序。不同于以上3个partitioner,这个类并不是基于hash的。在下一节里详细的介绍totalorderpartitioner。






TotalOrderPartitioner

每一个reducer的输出在默认的情况下都是有顺序的,但是reducer之间在输入是无序的情况下也是无序的。如果要实现输出是全排序的那就会用到TotalOrderPartitioner。

要使用TotalOrderPartitioner,得给TotalOrderPartitioner提供一个partition file。这个文件要求Key (这些key就是所谓的划分)的数量和当前reducer的数量-1相同并且是从小到大排列。对于为什么要用到这样一个文件,以及这个文件的具体细节待会还会提到。

TotalOrderPartitioner对不同Key的数据类型提供了两种方案:

1)        对于非BinaryComparable(参考附录A)类型的Key,TotalOrderPartitioner采用二分发查找当前的K所在的index。

例如reducer的数量为5,partition file 提供的4个划分为【2,4,6,8】。如果当前的一个key value pair 是<4,”good”>利用二分法查找到index=1,index+1=2那么这个key value pair将会发送到第二个reducer。如果一个key value pair为<4.5, “good”>那么二分法查找将返回-3,同样对-3加1然后取反就是这个key value pair 将要去的reducer。

对于一些数值型的数据来说,利用二分法查找复杂度是o(log (reducer count)),速度比较快。

2)         对于BinaryComparable类型的Key(也可以直接理解为字符串)。字符串按照字典顺序也是可以进行排序的。这样的话也可以给定一些划分,让不同的字符串key分配到不同的reducer里。这里的处理和数值类型的比较相近。

例如reducer的数量为5,partition file 提供了4个划分为【“abc”, “bce”, “eaa”, ”fhc”】那么“ab”这个字符串将会被分配到第一个reducer里,因为它小于第一个划分“abc”。

但是不同于数值型的数据,字符串的查找和比较不能按照数值型数据的比较方法。mapreducer采用的Tire tree的字符串查找方法。查找的时间复杂度o(m),m为树的深度,空间复杂度o(255^m-1)。是一个典型的空间换时间的案例。







Tire Tree

Tire tree的构建

假设树的最大深度为3,划分为【aaad ,aaaf, aaaeh,abbx 】

tairtree结构.jpg

tairtree结构


Mapreduce里的Tire tree主要有两种节点组成:
1)        Innertirenode
Innertirenode在mapreduce中是包含了255个字符的一个比较长的串。上图中的例子只包含了26个英文字母。

2)        叶子节点{unslipttirenode, singesplittirenode, leaftirenode}
Unslipttirenode 是不包含划分的叶子节点。
Singlesplittirenode 是只包含了一个划分点的叶子节点。
Leafnode是包含了多个划分点的叶子节点。(这种情况比较少见,达到树的最大深度才出现这种情况。在实际操作过程中比较少见)


Tire tree的搜索过程
接上面的例子:
1)假如当前 key value pair 这时会找到图中的leafnode,在leafnode内部使用二分法继续查找找到返回 aad在 划分数组中的索引。找不到会返回一个和它最接近的划分的索引。

2)假如找到singlenode,如果和singlenode的划分相同或小返回他的索引,比singlenode的划分大则返回索引+1。

3)假如找到nosplitnode则返回前面的索引。如将会返回abbx的在划分数组中的索引。


TotalOrderPartitioner的疑问

上面介绍了partitioner有两个要求,一个是速度另外一个是均衡负载。使用tire tree提高了搜素的速度,但是我们怎么才能找到这样的partition file 呢?让所有的划分刚好就能实现均衡负载。

InputSampler
输入采样类,可以对输入目录下的数据进行采样。提供了3种采样方法。

采样类结构图.jpg



采样类结构图




采样方式对比表:
类名称
采样方式
构造方法
效率
特点
SplitSampler<K,V>
对前n个记录进行采样
采样总数,划分数
最高

RandomSampler<K,V>
遍历所有数据,随机采样
采样频率,采样总数,划分数
最低

IntervalSampler<K,V>
固定间隔采样
采样频率,划分数

对有序的数据十分适用

writePartitionFile这个方法很关键,这个方法就是根据采样类提供的样本,首先进行排序,然后选定(随机的方法)和reducer数目-1的样本写入到partition file。这样经过采样的数据生成的划分,在每个划分区间里的key value pair 就近似相同了,这样就能完成均衡负载的作用。


TotalOrderPartitioner实例

  1. public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
  2.         implements Tool
  3. {
  4.     @Override
  5.     public int run(String[] args) throws Exception
  6.     {
  7.         JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
  8.         if (conf == null) {
  9.             return -1;
  10.         }
  11.         conf.setInputFormat(SequenceFileInputFormat.class);
  12.         conf.setOutputKeyClass(IntWritable.class);
  13.         conf.setOutputFormat(SequenceFileOutputFormat.class);
  14.         SequenceFileOutputFormat.setCompressOutput(conf, true);
  15.         SequenceFileOutputFormat
  16.                 .setOutputCompressorClass(conf, GzipCodec.class);
  17.         SequenceFileOutputFormat.setOutputCompressionType(conf,
  18.                 CompressionType.BLOCK);
  19.         conf.setPartitionerClass(TotalOrderPartitioner.class);
  20.         InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
  21.                 0.1, 10000, 10);
  22.         Path input = FileInputFormat.getInputPaths(conf)[0];
  23.         input = input.makeQualified(input.getFileSystem(conf));
  24.         Path partitionFile = new Path(input, "_partitions");
  25.         TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
  26.         InputSampler.writePartitionFile(conf, sampler);
  27.         // Add to DistributedCache
  28.         URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
  29.         DistributedCache.addCacheFile(partitionUri, conf);
  30.         DistributedCache.createSymlink(conf);
  31.         JobClient.runJob(conf);
  32.         return 0;
  33.     }
  34.     public static void main(String[] args) throws Exception {
  35.         int exitCode = ToolRunner.run(
  36.                 new SortByTemperatureUsingTotalOrderPartitioner(), args);
  37.         System.exit(exitCode);
  38.     }
  39. }
复制代码

附录A
Text 为BinaryComparable,WriteableComparable类型。
BooleanWritable、ByteWritable、DoubleWritable、MD5hash、IntWritable、FloatWritable、LongWritable、NullWriable等都为WriteableComparable。具体参考下图:
附录.JPG








加微信w3aboutyun,可拉入技术爱好者群

已有(1)人评论

跳转到指定楼层
狂飙中的海绵BB 发表于 2015-7-29 23:07:27
SequenceFileInputFormat,对二进制文本应该如何读取那?key是文件名?value是内容吗?对这个概念不太懂。望楼主科普下。谢谢。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条