分享

MapReduce 学习笔记(1) 以及 Yarn 框架基本介绍

本帖最后由 PeersLee 于 2016-3-6 18:12 编辑
问题导读:

1、MapReduce 是什么?
2、如何完成 属于自己的 wordcount Demo?
3、MapReduce 程序 怎样提交到 yarn集群并运行?

4、MapReduce 程序  Yarn 框架上的运行流程是什么?





解决方案:

环境:windows +JDK1.7 + eclipse + SecureCRTPortable(  访问密码 1bef )
centos 6.5 + JDK 1.7 + hadoop2.4.1



MapReduce简介:

  • 概述:


Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。



-----------------------------------------------------------------------------------------------------------


Hadoop 将作业(job)分成若干个小任务(task) 来执行,这些小任务分为两类:Map任务 与 Reduce 任务。

  • Map任务:


Hadoop 将 MapReduce 的输入数据分成相同大小的小数据块即分片(input split)。之后Hadoop 为每个数据分片构建一个map 任务,该任务会运行我们自定义的map 函数从而处理分片中每条记录。

如果我们并行处理每个分片,并且数据分片比较小,那么整个数据处理过程将获得更好的负载均衡;另一方面,当分片切分的过小,管理分片的总时间和构建map 任务的总时间将决定整个作业的执行时间。所以,一个合理的分片大小趋近于 HDFS 的一个块的大小,默认大小为 64 MB ,具体可以根据集群调整这个默认值。

Hadoop 的map 任务 实现了 “数据本地化优化”。当 Hadoop 在存储数据的节点上运行 map 任务时,无需使用宝贵的集群带宽资源,因此该模式可以过得最佳性能。但是有时候 对于一个 map 任务来说,当存储 HDFS 数据块的三个节点都在运行其他 map 任务,这时候作业调度需要在三个备份中的某个数据寻求同个机架中空闲的机器来运行该 map 任务(就是如果不能本地化,那就找一个离数据节点最近的节点)。所以为了确保可以存储在单个节点上的最大输入块,最佳分片的大小应该与块大小相同。

Map任务将其输出写入本地磁盘。Map任务输出的结果不是最终结果,他的输出是Reduce 任务的输入,当最终结果(Reduce 输出的结果)产生之后,这个Map任务的输出的中间结果就可以删除了。假设我们使用HDFS 去存储这个结果,并实现备份不免会有小题大做。当Map 将他的输出传递给 Reduce 之前失败了,Hadoop 会在另一个节点去重新构建这个map任务。

  • Reduce 任务:


Reduce 任务不具备数据本地化的优势。对于单个 Reduce 任务来说,Hadoop 会将所有的 Map 任务输出的结果排序,然后发送给该 Reduce 任务来处理。这些数据在 Reduce 端进行合并,之后使用我们自己写的 Reduce 函数去进行分析。Reduce 的输出是整个 MapReduce 程序的最终输出结果,所以Hadoop 会将 Reduce 的输出写到HDFS中,实现可靠存储。在讲 Reduce 结果写入 HDFS 中时需要占用网络带宽。

一个 Reduce 任务的 MapReduce 数据流:

2016-03-04_160350.png

如果有好多个 Reduce 任务,每个 Map 任务就会针对输出进行分区,也可以当成为每个 Reduce 任务分配一个区(数据来源),因此每一个 Reduce 任务的输入都来自许多个 map 任务。这个map 任务 与 Reduce 任务之间的数据流被称为 混洗(shuffle),一般shuffle 过程是很复杂的。

多个 Reduce 任务的 MapReduce 数据流:
2016-03-04_220843.png




MapReduce Demo(Word count 开发流程及运行机制讲解):

  • 添加Hadoop 封装的Jar 包
点击项目的名字之后 Alt + Enter 进入 参数设置界面如下图


2016-03-05_183209.png

选择Java Build Path 进入界面如下

2016-03-05_183557.png

选择User Library 如下

2016-03-05_183617.png

Next > 之后,进入选择 Library 界面 选择自己需要的User Library

2016-03-05_184018.png

注:我之前已经创建了 hadoop241 Library ,下面介绍创建自己的 User Library 方法

点击上面图片的 User Library 之后出现如下图

2016-03-05_184540.png

【new】

2016-03-05_184552.png

【ok】-> 【Add External JARs】-> 选择自己需要的 Jar 包(hadoop-2.4.1\share\hadoop\ 下的各个文件夹中)

2016-03-05_185119.png

最后 【Finish】 -> 【Ok】

  • 代码




Mapper:

[mw_shl_code=java,true]package peerslee.hadoop.wordcount;

import java.io.IOException;
import java.io.StringWriter;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//4个泛型中,前两个是Mapper 的输入数据的类型,后两个是Mapper 的输出类型(同时为Reducer 的输入类型)
//Mapper 和 Reducer都是以 key-value 对的形式进行封装的
//默认情况:Mapper 的key 是要处理的文本(输入数据)中的一行的起始偏移量(Long),这一行的内容 为value(String)
//由于需要网络传输,所以要用字节流(序列化)
public class WCMapeer extends Mapper<LongWritable, Text, Text, LongWritable> {

        //重写map 方法,拿一行数据就执行一次这个方法
        @Override
        protected void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
                //具体业务逻辑
                //我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value
                //key -> 行起始偏移量, value -> 行内容
                 
                String line = value.toString();
               
                //StringUtils.split() 可以提取文本中的单词
                String[] words = StringUtils.split(line, " ");
               
                //遍历单词数组,输出 key-value 形式
                for(String word : words) {
                        context.write(new Text(word), new LongWritable(1));
                }
        }        
}[/mw_shl_code]

------------------------------------------------------------------------

Reducer:

[mw_shl_code=java,true]package peerslee.hadoop.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

        
        //在框架中 Mapper处理完成之后,将所有 key-value 对缓存起来,进行分组,传递一个组,进行一次Reducer
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
        
                long count = 0;
               
                //遍历value的list,累加计算得出单词个数
                for(LongWritable value:values) {
                        count += value.get();
                }
               
                //输出这个单词的统计结果
                context.write(key, new LongWritable(count));
               
        }
}
[/mw_shl_code]

Runner:

[mw_shl_code=java,true]package peerslee.hadoop.wordcount;

import java.io.File;                           

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 用来描述一个特定的作业,说明这个作业使用的Mapper 和  Reducer 具体是哪个类
* 还可以指定这个作业 要处理的数据所在的路径 以及 指定这个作业的输出结果应该放到哪个路径
*/
public class WCRunner {
        public static void main(String []args) throws Exception {
                Configuration conf = new Configuration();
               
                Job wCJob = Job.getInstance(conf);
               
                //设置整个作业使用的类 所在的jar包
                wCJob.setJarByClass(WCRunner.class);
               
                //该作业使用的 Mapper 和 Reducer 的类
                wCJob.setMapperClass(WCMapeer.class);
                wCJob.setReducerClass(WCReducer.class);

                //指定Reducer 输出数据的类型
                wCJob.setOutputKeyClass(Text.class);
                wCJob.setOutputValueClass(LongWritable.class);
               
                //指定Mapper 输出数据的类型
                wCJob.setMapOutputKeyClass(Text.class);
                wCJob.setMapOutputValueClass(LongWritable.class);
               
                //指定要处理的输入数据所存放的位置
                FileInputFormat.setInputPaths(wCJob, new Path("/wc/input/"));
               
                //指定要处理的输出数据所村范的位置
                FileOutputFormat.setOutputPath(wCJob, new Path("/wc/output/"));
               
                //将作业交给集群运行
                wCJob.waitForCompletion(true);
               
                 
               
        }
}
[/mw_shl_code]

  • 在服务器上运行集群模式


(1)从eclipse 中导出Jar 包

右键项目名称选择【export】之后

2016-03-05_194644.png

选择【JAR Files】之后

2016-03-05_194701.png

选择输出路径之后【Finish】完成

(2)上传到hadoop 集群中

2016-03-05_195038.png

(3)将我们要分析的文本上传到HDFS的指定目录中(输出目录一定要保证不存在)

(4) 命令:hadoop jar wc.jar peerslee.hadoop.wordcount.WCRunner


[mw_shl_code=bash,true]16/03/05 04:18:30 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/05 04:18:31 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/05 04:18:32 INFO input.FileInputFormat: Total input paths to process : 1
16/03/05 04:18:32 INFO mapreduce.JobSubmitter: number of splits:1
16/03/05 04:18:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457180269124_0001
16/03/05 04:18:33 INFO impl.YarnClientImpl: Submitted application application_1457180269124_0001
16/03/05 04:18:33 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457180269124_0001/
16/03/05 04:18:33 INFO mapreduce.Job: Running job: job_1457180269124_0001
16/03/05 04:19:06 INFO mapreduce.Job: Job job_1457180269124_0001 running in uber mode : false
16/03/05 04:19:06 INFO mapreduce.Job:  map 0% reduce 0%
16/03/05 04:19:12 INFO mapreduce.Job:  map 100% reduce 0%
16/03/05 04:19:19 INFO mapreduce.Job:  map 100% reduce 100%
16/03/05 04:19:19 INFO mapreduce.Job: Job job_1457180269124_0001 completed successfully
16/03/05 04:19:19 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=345
                FILE: Number of bytes written=186439
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=229
                HDFS: Number of bytes written=90
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=3829
                Total time spent by all reduces in occupied slots (ms)=4136
                Total time spent by all map tasks (ms)=3829
                Total time spent by all reduce tasks (ms)=4136
                Total vcore-seconds taken by all map tasks=3829
                Total vcore-seconds taken by all reduce tasks=4136
                Total megabyte-seconds taken by all map tasks=3920896
                Total megabyte-seconds taken by all reduce tasks=4235264
        Map-Reduce Framework
                Map input records=8
                Map output records=21
                Map output bytes=297
                Map output materialized bytes=345
                Input split bytes=99
                Combine input records=0
                Combine output records=0
                Reduce input groups=11
                Reduce shuffle bytes=345
                Reduce input records=21
                Reduce output records=11
                Spilled Records=42
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=196
                CPU time spent (ms)=1150
                Physical memory (bytes) snapshot=216961024
                Virtual memory (bytes) snapshot=725647360
                Total committed heap usage (bytes)=136908800
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=130
        File Output Format Counters
                Bytes Written=90[/mw_shl_code]

(5)命令: hadoop fs -cat /wc/output/part-r-00000

[mw_shl_code=bash,true]English 2
dislike 1
good    1
hello   5
learnning      2
li      2
like    1
morning 1
sau     1
wang    3
zhang   2[/mw_shl_code]




yarn 框架运行机制:

  • 文字描述


执行 hadoop jar xxx.xxx.xxx 命令之后 将在节点中 启动一个Run Jar的进程,之后会产生MRAppMaster进程(作业控制功能模块),Run Jar 在MRAppMaster 启动之后就可以消失,之后MRAppMaster 会产生Yarn Child 进程(map task 与 reduce task),当reduce task 完成时,最后一个Yarn Child 也会消失,最后MRAppMaster 注销自己。


  • 过程图:



2016-03-06_161658.png






已有(6)人评论

跳转到指定楼层
yundata 发表于 2016-3-7 08:13:34
很好,收藏了。
回复

使用道具 举报

congra321 发表于 2016-3-15 12:22:11
下载学习学习

回复

使用道具 举报

mc123612997 发表于 2016-4-7 11:25:52
学习,收藏了
回复

使用道具 举报

xuezhiji 发表于 2016-5-4 14:12:08
不错,学习了
回复

使用道具 举报

Rommy.Yang 发表于 2016-6-3 23:46:55
不错,不过用插件可以直接在eclipse中跑
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条