本帖最后由 xioaxu790 于 2014-5-20 10:15 编辑
问题导读:
1、优化MapReduce有什么有效方法 ?
2、Hadoop流运行机制 ?
MapReduce任务的优化
相信每个程序员在编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”。同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题。
MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化。这其中,又包含六个方面的内容。
1. 任务调度
任务调度是Hadoop中非常重要的一环,这个优化又涉及两个方面的内容。计算方面:Hadoop总会优先将任务分配给空闲的机器,使所有的任务能公平地分享系统资源。I/O方面:Hadoop会尽量将Map任务分配给InputSplit所在的机器,以减少网络I/O的消耗。
2. 数据预处理与InputSplit的大小
MapReduce任务擅长处理少量的大数据,而在处理大量的小数据时,MapReduce的性能就会逊色很多。因此在提交MapReduce任务前可以先对数据进行一次预处理,将数据合并以提高MapReduce任务的执行效率,这个办法往往很有效。如果这还不行,可以参考Map任务的运行时间,当一个Map任务只需要运行几秒就可以结束时,就需要考虑是否应该给它分配更多的数据。通常而言,一个Map任务的运行时间在一分钟左右比较合适,可以通过设置Map的输入数据大小来调节Map的运行时间。在FileInputFormat中(除了CombineFileInputFormat),Hadoop会在处理每个Block后将其作为一个InputSplit,因此合理地设置block块大小是很重要的调节方式。除此之外,也可以通过合理地设置Map任务的数量来调节Map任务的数据输入。
3. Map和Reduce任务的数量
合理地设置Map任务与Reduce任务的数量对提高MapReduce任务的效率是非常重要的。默认的设置往往不能很好地体现出MapReduce任务的需求,不过,设置它们的数量也要有一定的实践经验。
首先要定义两个概念—Map/Reduce任务槽。Map/Reduce任务槽就是这个集群能够同时运行的Map/Reduce任务的最大数量。比如,在一个具有1200台机器的集群中,设置每台机器最多可以同时运行10个Map任务,5个Reduce任务。那么这个集群的Map任务槽就是12000,Reduce任务槽是6000。任务槽可以帮助对任务调度进行设置。
设置MapReduce任务的Map数量主要参考的是Map的运行时间,设置Reduce任务的数量就只需要参考任务槽的设置即可。一般来说,Reduce任务的数量应该是Reduce任务槽的0.95倍或是1.75倍,这是基于不同的考虑来决定的。当Reduce任务的数量是任务槽的0.95倍时,如果一个Reduce任务失败,Hadoop可以很快地找到一台空闲的机器重新执行这个任务。当Reduce任务的数量是任务槽的1.75倍时,执行速度快的机器可以获得更多的Reduce任务,因此可以使负载更加均衡,以提高任务的处理速度。
4. Combine函数
Combine函数是用于本地合并数据的函数。在有些情况下,Map函数产生的中间数据会有很多是重复的,比如在一个简单的WordCount程序中,因为词频是接近与一个zipf分布的,每个Map任务可能会产生成千上万个记录,若将这些记录一一传送给Reduce任务是很耗时的。所以,MapReduce框架运行用户写的combine函数用于本地合并,这会大大减少网络I/O操作的消耗。此时就可以利用combine函数先计算出在这个Block中单词the的个数。合理地设计combine函数会有效地减少网络传输的数据量,提高MapReduce的效率。
在MapReduce程序中使用combine很简单,只需在程序中添加如下内容:
job.setCombinerClass(combine.class);
在WordCount程序中,可以指定Reduce类为combine函数,具体如下:
job.setCombinerClass(Reduce.class);
5. 压缩
编写MapReduce程序时,可以选择对Map的输出和最终的输出结果进行压缩(同时可以选择压缩方式)。在一些情况下,Map的中间输出可能会很大,对其进行压缩可以有效地减少网络上的数据传输量。对最终结果的压缩虽然会减少数据写HDFS的时间,但是也会对读取产生一定的影响,因此要根据实际情况来选择(第7章中提供了一个小实验来验证压缩的效果)。
6. 自定义comparator
在Hadoop中,可以自定义数据类型以实现更复杂的目的,比如,当读者想实现k-means算法(一个基础的聚类算法)时可以定义k个整数的集合。自定义Hadoop数据类型时,推荐自定义comparator来实现数据的二进制比较,这样可以省去数据序列化和反序列化的时间,提高程序的运行效率
Hadoop流
Hadoop流提供了一个API,允许用户使用任何脚本语言写Map函数或Reduce函数。Hadoop流的关键是,它使用UNIX标准流作为程序与Hadoop之间的接口。因此,任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数。
举个最简单的例子(本例的运行环境:Ubuntu,Hadoop-0.20.2):
- bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat –reducer usr/bin/wc
复制代码
从这个例子中可以看到,Hadoop流引入的包是hadoop-0.20.2-streaming.jar,并且具有如下命令:
- -input 指明输入文件路径
- -output 指明输出文件路径
- -mapper 指定map函数
- -reducer 指定reduce函数
复制代码
Hadoop流的工作原理
先来看Hadoop流的工作原理。在上例中,Map和Reduce都是Linux内的可执行文件,更重要的是,它们接受的都是标准输入(stdin),输出的都是标准输出(stdout)。如果大家熟悉Linux,那么对它们一定不会陌生。执行上一节中的示例程序的过程如下所示。
程序的输入与WordCount程序是一样的,具体如下:
- file01:
- hello world bye world
- file02
- hello hadoop bye hadoop
复制代码
输入命令:
- bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat -reducer /usr/bin/wc
复制代码
显示:
- packageJobJar: [/root/tmp/hadoop-unjar7103575849190765740/] [] /tmp/streamjob2314757737747407133.jar tmpDir=null
- 11/01/23 02:07:36 INFO mapred.FileInputFormat: Total input paths to process : 2
- 11/01/23 02:07:37 INFO streaming.StreamJob: getLocalDirs(): [/root/tmp/mapred/local]
- 11/01/23 02:07:37 INFO streaming.StreamJob: Running job: job_201101111819_0020
- 11/01/23 02:07:37 INFO streaming.StreamJob: To kill this job, run:
- 11/01/23 02:07:37 INFO streaming.StreamJob: /root/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201101111819_0020
- 11/01/23 02:07:37 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201101111819_0020
- 11/01/23 02:07:38 INFO streaming.StreamJob: map 0% reduce 0%
- 11/01/23 02:07:47 INFO streaming.StreamJob: map 100% reduce 0%
- 11/01/23 02:07:59 INFO streaming.StreamJob: map 100% reduce 100%
- 11/01/23 02:08:02 INFO streaming.StreamJob: Job complete: job_201101111819_0020
- 11/01/23 02:08:02 INFO streaming.StreamJob: Output: output
复制代码
程序的输出是:
复制代码
wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。
这里,可以看看上一篇内容:MapReduce: 提高MapReduce性能的七点建议
|