丫丫 发表于 2015-10-12 13:56:29

Hadoop(6)MapReduce 性能调优:优化洗牌(shuffle)和排序阶段

本帖最后由 pig2 于 2015-10-19 19:54 编辑

问题导读

1.如何在MapReduce规避reduce的使用?
2.如何减少被洗牌的数据?
3.如何在过滤和投影后进一步减少运行时间?
4.如何利用用Comparator减少作业的排序时间?


static/image/hrline/4.gif


6.4.3 优化洗牌(shuffle)和排序阶段

洗牌和排序阶段都很耗费资源。洗牌需要在map和reduce任务之间传输数据,会导致过大的网络消耗。排序和合并操作的消耗也是很显著的。这一节将介绍一系列的技术来缓解洗牌和排序阶段的消耗。
技术46 规避使用reduce

Reduce在用于连接数据集的时候将会产生大量的网络消耗。
问题

需要考虑在MapReduce规避reduce的使用。
方案

通过将MapReduce参数setNumReduceTasks设置为0来创建一个只有map的作业。
讨论

洗牌和排序阶段一般都是用来连接数据集。但连接操作并不一定需要洗牌和排序,正如第4章中所介绍的。满足一定条件的连接可以只在map端运行。那么就只需要只有map的作业了。设置只有map的作业的命令如下。
job.setNumReduceTasks(0);

小结

一个只有map的作业的OutputFormat是和普通作业中reduce的OutputFormat一样。如图6.39所示。

如果无法规避reduce,那么就要尽量减小它对你的作业执行时间的影响。

技术47 过滤和投影
Map到Reduce之间传输数据要通过网络,这个成本很高。
问题

需要减少被洗牌的数据。
方案

减少map输出的每条记录的大小,并尽可能地减少map输出的数据量。
讨论

过滤和投影是关系运算中的概念,用以减少需要处理的数据。这些概念也可以用到MapReduce中减少map任务需要输出的数据。以下是过滤和投影的简明定义:
[*]过滤是减少map输出的数据量。
[*]投影是减少map输出的每条记录的大小。
以下是上述概念的演示代码:


1 Text outputKey = new Text();
2 Text outputValue = new Text();
3
4 @Override
5 public void map(LongWritable key, Text value,
6               OutputCollector<Text, Text> output,
7               Reporter reporter) throws IOException {
8                  
9   String v = value.toString();
10   
11   if (!v.startsWith("10.")) {
12         String[] parts = StringUtils.split(v, ".", 3);
13         outputKey.set(parts);
14         outputValue.set(parts);
15         output.collect(outputKey, outputValue);
16   }
17 }



小结

过滤和投影是在需要显著减少MapReduce作业运行时间时最容易的方法中的两种。如果已经应用了这两种方法,但还需要进一步减少运行时间。那么就可以考虑combine。
技术48 使用combine

Combine可以在map阶段进行聚合操作来减少需要发送到reduce的数据。它是一个map端的优化工具,以map的输出作为输入。
问题

需要在过滤和投影后进一步减少运行时间。
方案

定义一个combine。在作业代码中使用setCombinerClass来调用它。
讨论

在map输出数据到磁盘的过程中,有两个子过程:溢洒(spill)子过程,合并子过程。Combine在这两个子过程中都会被调用,如图6.40所示。为了让combine在分组数据中效率最大,可以在两个子过程调用combine之前进行初步(precursory)的排序。

与设置map类类似,作业使用setCombinClass来设置combine。
job.setCombinerClass(Combine.class);

Combine的实现必须严格遵从reduce的规格说明。这里将假定使用技术39种的map。将map的输出中的记录按照下述条件合并:第二个八进制数相同。代码如下。

1 public static class Combine implements Reducer<Text, Text, Text, Text> {
2      
3   @Override
4   public void reduce(Text key, Iterator<Text> values,
5                         OutputCollector<Text,
6                         Text> output,
7                         Reporter reporter) throws IOException {
8                        
9         Text prev = null;
10         while (values.hasNext()) {
11             Text t = values.next();
12             if (!t.equals(prev)) {
13               output.collect(key, t);
14             }
15             prev = ReflectionUtils.copy(job, t, prev);
16         }
17   }
18 }



Combine函数必须是可分布的(distributive)。如图6.40(在前面)所示,combine要被调用多次处理多个具有相同输入键的记录。这些记录的顺序是不可预测的。可分布函数是指,不论输入数据的顺序如何,最终的结果都一样。
小结

在MapReduce中combine非常有用,它能够减少map和reduce之间的网络传输数据和网络负载。下一个减少执行时间的有用工具就是二进制比较器。
技术49 用Comparator进行超快排序

MapReduce默认使用RawComparator对map的输出键进行比较排序。内置的Writable类(例如Text和IntWritable)是字节级实现。这样不用将字节形式的类解排列(unmarshal)成类对象。如果要通过WritableComparable实现自定义Writable,就有可能延长洗牌和排序阶段的时间,因为它需要进行解排列。
问题

存在自定义的Writable。需要减少作业的排序时间。
方案

实现字节级的Comparator来优化排序中的比较过程。
讨论

在MapReduce中很多阶段,排序是通过比较输出键来进行的。为了加快键排序,所有的map输出键必须实现WritableComparable接口。
1 public interface WritableComparable<T> extends Writable, Comparable<T> {2 3 }

如果对4.2.1中的Person类进行改造,实现代码如下。

1 public class Person implements WritableComparable<Person> {
2   private String firstName;
3   private String lastName;
4      
5   @Override
6   public int compareTo(Person other) {
7         int cmp = this.lastName.compareTo(other.lastName);
8         if (cmp != 0) {
9             return cmp;
10         }
11         return this.firstName.compareTo(other.firstName);
12   }
13 ...



这个Comparator的问题在于,如果要进行比较,就需要将字节形式的map的中间结果数据解排列成Writable形式。解排列要重新创建对象,因此成本很高。Hadoop中的自带的各种Writable类不但扩展了WritableComparable接口,也提供了基于WritableComparator类的自定义Comparator。代码如下。

1 public class WritableComparator implements RawComparator {
2
3   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
4      
5         try {
6             buffer.reset(b1, s1, l1);
7             key1.readFields(buffer);
8
9             buffer.reset(b2, s2, l2);
10             key2.readFields(buffer);
11         } catch (IOException e) {
12             throw new RuntimeException(e);
13         }
14         return compare(key1, key2);
15   }
16   
17   /** Compare two WritableComparables.
18   *
19   * <p> The default implementation uses the natural ordering,
20   * calling {@link
21   * Comparable#compareTo(Object)}. */
22   @SuppressWarnings("unchecked")
23   public int compare(WritableComparable a, WritableComparable b) {
24         return a.compareTo(b);
25   }
26   ...
27 }



要实现字节级的Comparator,需要重载compare方法。这里先学习一下IntWritable类如何实现这个方法。
1 public class IntWritable implements WritableComparable {
2
3   public static class Comparator extends WritableComparator {
4   
5         public Comparator() {
6             super(IntWritable.class);
7         }
8         
9         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10             int thisValue = readInt(b1, s1);
11             int thatValue = readInt(b2, s2);
12             return (thisValue<thatValue ? -1 :
13             (thisValue==thatValue ? 0 : 1));
14         }
15   }
16   
17   static {
18         WritableComparator.define(IntWritable.class, new Comparator());
19   }


如果只使用内置的Writable,那就没有必要实现WritableComparator。它们都自带。如果需要使用自定义的Writable作为输出键,那么就需要自定义WritableComparator。这里基于前述Person类来说明如何实现。在Person类中,有两个字符串类属性,firstName和lastName。使用writeUTF方法通过DataOutput输出它们。以下是实现代码。
1 private String firstName;
2 private String lastName;
3
4 @Override
5 public void write(DataOutput out) throws IOException {
6   out.writeUTF(lastName);
7   out.writeUTF(firstName);
8 }


首先需要理解Person对象是如何用字节形式表示的。writeUTF方法输出了字节长度(2个字节),字符内容(字符的长度,L1个字节)。如图6.41描述了字节是如何排列的。

假设需要对lastName和firstName进行字典式地比较(译注:就是看字典中的先后顺序)。显然不能直接用整个字节数组,因为其中还有字符长度。那么Comparator就需要足够聪明到能够跳过字符长度。以下是实现代码。
1 @Override
2 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
3
4   int lastNameResult = compare(b1, s1, b2, s2);
5   if (lastNameResult != 0) {
6         return lastNameResult;
7   }
8   int b1l1 = readUnsignedShort(b1, s1);
9   int b2l1 = readUnsignedShort(b2, s2);
10   return compare(b1, s1 + b1l1 + 2, b2, s2 + b2l1 + 2);11 }
12
13 public static int compare(byte[] b1, int s1, byte[] b2, int s2) {
14   int b1l1 = readUnsignedShort(b1, s1);
15   int b2l1 = readUnsignedShort(b2, s2);
16   return compareBytes(b1, s1 + 2, b1l1, b2, s2 + 2, b2l1);
17 }
18
19 public static int readUnsignedShort(byte[] b, int offset) {
20   int ch1 = b;
21   int ch2 = b;
22   return (ch1 << 8) + (ch2);
23 }


小结


writeUTF只支持小于65536字符的字符串类。对于人名来说,是足够了。大点的,可能就不行。这个时候就需要使用Hadoop的Text类来支持更大的字符串。Text类中的Comparator类的二进制字符串比较器的实现机制和刚才介绍的大致相当。(这个修饰真长。)那么针对Text类的lastName和firstName的Comparator的实现方式也会累死。


下一节将介绍如何减小数据倾斜的影响。

Hadoop(1)MapReduce 性能调优:性能测量(Measuring)
http://www.aboutyun.com/thread-15514-1-1.html

Hadoop(2)MapReduce 性能调优:理解性能瓶颈,诊断map性能瓶颈
http://www.aboutyun.com/thread-15517-1-1.html

Hadoop(3)MapReduce 性能调优:诊断reduce性能瓶颈
http://www.aboutyun.com/thread-15522-1-1.html

Hadoop(4)MapReduce 性能调优:诊断一般性能瓶颈
http://www.aboutyun.com/thread-15660-1-1.html


Hadoop(5)MapReduce 性能调优:诊断硬件性能瓶颈
http://www.aboutyun.com/thread-15534-1-1.html


Hadoop(6)MapReduce 性能调优:优化洗牌(shuffle)和排序阶段
http://www.aboutyun.com/thread-15545-1-1.html


Hadoop(7)MapReduce 性能调优:减小数据倾斜的性能损失
http://www.aboutyun.com/thread-15544-1-1.html

Hadoop(8)MapReduce 性能调优:优化MapReduce的用户JAVA代码
http://www.aboutyun.com/thread-15583-1-1.html


Hadoop(9)MapReduce 性能调优:优化数据序列化
http://www.aboutyun.com/thread-15658-1-1.html


Hadoop(10)MapReduce 文件处理:小文件
http://www.aboutyun.com/thread-15592-1-1.html

Hadoop(11)MapReduce 文件处理:基于压缩的高效存储(一)
http://www.aboutyun.com/thread-15626-1-1.html

Hadoop(12)MapReduce 文件处理:基于压缩的高效存储(二)
http://www.aboutyun.com/thread-15629-1-1.html



foreverfeng168 发表于 2015-10-12 15:22:28

期待写些如何解决数据倾斜的技术文章

wangzhenqiang 发表于 2015-10-20 16:58:52

我赞一个先{:soso_e104:}

xuezhiji 发表于 2016-5-6 16:42:14

不错,学习了

神罗天征 发表于 2019-3-22 10:10:33

不错的文章
页: [1]
查看完整版本: Hadoop(6)MapReduce 性能调优:优化洗牌(shuffle)和排序阶段