分享

MapReduce 学习笔记(2)

PeersLee 发表于 2016-3-12 22:38:21 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 10101
本帖最后由 PeersLee 于 2016-3-23 16:02 编辑
问题导读:

1、MR程序 如何进行 本地模式运行?
2、MR程序 如何进行 集群模式运行?

3、怎样区分 字节流 与 字符流?
4、如何用 MapperReduce 计算框架实现 流量求和功能?

5、如何实现hadoop的自定义排序?
6、如何实现mr程序中自定义分组?
7、shuffle机制是如何工作的?



解决方案:

环境:

centos 6.5 + jdk 1.7 + hadoop 2.4.1



MR程序的本地模式:

Linux(已进行 hadoop 2.4.1 的位分布式安装,下文不再注释该条件) 上的 eclipse 直接 进行本地模式运行:

不添加yarn相关的配置,也会提交给localjobrunner执行的

(1)确保 yarn 的 jar 包已经在你的 User Library 中:

2016-03-07_203633.png

(2)需要修改的代码:

2016-03-07_200638.png

(3)运行 WCRunner.java 的 main 函数:

2016-03-07_204003.png

(4)查看输出结果:

2016-03-07_204651.png


2016-03-07_204640.png
---------------------------------------------------------------------------------------


linux 的 eclipse 操作Hdfs  进行本地模式运行:

(1)启动 HDFS -> start-dfs.sh

2016-03-07_221251.png

(2)修改代码如下:

2016-03-07_205132.png

(3)运行 在 HDFS 上检查计算结果

2016-03-07_221104.png

(4)当出现这个异常时
2016-03-07_220936.png


要将两个配置文件copy 到src 目录下就可以

2016-03-07_221336.png



MR程序的本地模式:

打成jar 包 上传到 HDFS 上 进行集群模式运行:

MapReduce 学习笔记(1)

---------------------------------------------------------------------------------

linux 的eclipse  进行集群模式运行:

(1)在工程src目录下加入 mapred-site.xml  和  yarn-site.xml


(2)工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数 conf.set("mapreduce.job.jar","wc.jar");      





字符流(处理String,Long对象) -序列化->  字节流(二进制对象):

字节流 FileInputStream FileOutputStream
字符流 FileReader FileWriter


字符流处理的单元为2个字节的Unicode字符,分别操作字符、字符数组或字符串,而字节流处理单元为1个字节,操作字节和字节数组。所以字符流是由Java虚拟机将字节转化为2个字节的Unicode字符为单位的字符而成的,所以它对多国语言支持性比较好!如果是音频文件、图片、歌曲,就用字节流好点,如果是关系到中文(文本)的,用字符流好点.所有文件的储存是都是字节(byte)的储存,在磁盘上保留的并不是文件的字符而是先把字符编码成字节,再储存这些字节到磁盘。在读取文件(特别是文本文件)时,也是一个字节一个字节地读取以形成字节序列.字节流可用于任何类型的对象,包括二进制对象,而字符流只能处理字符或者字符串字节流提供了处理任何类型的IO操作的功能,但它不能直接处理Unicode字符,而字符流就可以。字节流转换成字符流可以用 InputSteamReader OutputStreamWriter





Mapreduce框架实现 流量求和 功能:

  • 思路:


Mapper 模块 需要从数据中抽取我们需要的有用的数据段,Reducer 模块 得到 Mapper 截取完的数据块进行求和计算得出每个用的上行流量与下行流量和流量和。这里Mapper 的输出数据类型需要我们自己去封装一个 JavaBean 来包装我们使用的数据,这里涉及到hadoop 的序列化接口 Writable。
2016-03-10_221530.png

  • 代码:

FlowBean:
[mw_shl_code=java,true]package peerslee.hadoop.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
*
* @author peerslee
*
*FlowBean 是我们自己定义的数据类型,要在hadoop 的所有节点上进行传递,首先要遵循hadoop 的序列化机制,
*所以要实现hadoop 的序列化接口 Writable(这是接口,很重要)
*
*hadoop 的序列化机制不会传递对象的继承结构(jdk 自带的序列化机制会携带对象的继承机制)
*/

public class FlowBean implements Writable{

        //私有域
        private String phoneNB;
        private long u_flow;
        private long d_flow;
        private long s_flow;
        
        //反射机制需要 空参构造函数
        public FlowBean() {}
        
        //构造器
        public FlowBean(String phoneNB, long u_flow, long d_flow) {
                this.phoneNB = phoneNB;
                this.u_flow = u_flow;
                this.d_flow = d_flow;
                this.s_flow = u_flow + d_flow;
        }


        public String getPhoneNB() {
                return phoneNB;
        }

        public void setPhoneNB(String phoneNB) {
                this.phoneNB = phoneNB;
        }

        public long getU_flow() {
                return u_flow;
        }

        public void setU_flow(long u_flow) {
                this.u_flow = u_flow;
        }

        public long getD_flow() {
                return d_flow;
        }

        public void setD_flow(long d_flow) {
                this.d_flow = d_flow;
        }

        public long getS_flow() {
                return s_flow;
        }

        public void setS_flow(long s_flow) {
                this.s_flow = s_flow;
        }

        //将对象序列化到流中
        @Override
        public void write(DataOutput out) throws IOException {
               
                //序列化
                out.writeUTF(phoneNB);
                out.writeLong(u_flow);
                out.writeLong(d_flow);
                out.writeLong(s_flow);
               
        }

        //将数据流反序列话成对象
        @Override
        public void readFields(DataInput in) throws IOException {
               
                //反序列化
                //注意数据类型要和序列化时保持一致,怎么读进去怎么读出来
                phoneNB = in.readUTF();
                u_flow = in.readLong();
                d_flow = in.readLong();
                s_flow = in.readLong();
        }

        //重写 toString 方法
        @Override
        public String toString() {
                return "" + u_flow + "\t" + d_flow + "\t" + s_flow;
        }
}
[/mw_shl_code]

Mapper:

[mw_shl_code=java,true]package peerslee.hadoop.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
* mapper 的业务逻辑:
* 按行读取数据,对每一行数据进行截取,得到我们需要的数据段,封装成v-k 模式发送个 reduce
*/

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value,Context context)
                        throws IOException, InterruptedException {
               
                //按行读取数据
                String line = value.toString();
               
                //进行截取
                String []fields = StringUtils.split(line, "\t");
               
                //得到我们需要的数据
                String phoneNB = fields[1];
                long u_flow = Long.parseLong(fields[7]);
                long d_flow = Long.parseLong(fields[8]);
               
                //封装数据并输出
                context.write(new Text(phoneNB), new FlowBean(phoneNB, u_flow, d_flow));
               
               
        }
}
[/mw_shl_code]

Reducer:

[mw_shl_code=java,true]package peerslee.hadoop.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
* reducer 的业务逻辑:
* 框架每传递一组数据<手机号,flowbean 的数组>,调用一次Reduce 函数
* 该框架要对 value进行遍历、加和、输出
*/

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        
        @Override
        protected void reduce(Text key, Iterable<FlowBean> value,Context context)
                        throws IOException, InterruptedException {
               
                long u_flow_sum = 0;
                long d_flow_sum = 0;
               
                //累加求和
                for(FlowBean bean : value) {
                        u_flow_sum += bean.getU_flow();
                        d_flow_sum += bean.getD_flow();
                }
               
                //封装输出
                context.write(key, new FlowBean(key.toString(), u_flow_sum, d_flow_sum));
               
        }
}
[/mw_shl_code]

Runner:

[mw_shl_code=java,true]package peerslee.hadoop.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FlowSumRunner extends Configured implements Tool{

        @Override
        public int run(String[] args) throws Exception {
               
                //job
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);
               
                //setJar
                job.setJarByClass(FlowSumRunner.class);
               
                //MapperClass & ReducerClass
                job.setMapperClass(FlowSumMapper.class);
                job.setReducerClass(FlowSumReducer.class);
               
                //OutputKey - Outputvalue
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(FlowBean.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(FlowBean.class);
               
                //InputPaths - OutputPaths
                // 这个包 hadoop.mapreduce.lib
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
               
                return job.waitForCompletion(true) ? 1 : 0;
               
        }

        public static void main(String[] args) throws Exception {
                int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
                System.out.println(res);
        }
        
}
[/mw_shl_code]

  • 结果:



[mw_shl_code=bash,true][hadoop@master ~]$ hadoop fs -put HTTP_20130313143750.dat /flow/input
[hadoop@master ~]$ hadoop fs -rm -r /flow/output
16/03/10 03:39:17 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /flow/output
[hadoop@master ~]$ hadoop jar flow.jar peerslee.hadoop.flowsum.FlowSumRunner /flow/input /flow/output
16/03/10 03:39:30 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/10 03:39:30 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/10 03:39:30 INFO input.FileInputFormat: Total input paths to process : 1
16/03/10 03:39:31 INFO mapreduce.JobSubmitter: number of splits:1
16/03/10 03:39:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457609295138_0002
16/03/10 03:39:31 INFO impl.YarnClientImpl: Submitted application application_1457609295138_0002
16/03/10 03:39:31 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457609295138_0002/
16/03/10 03:39:31 INFO mapreduce.Job: Running job: job_1457609295138_0002
16/03/10 03:39:38 INFO mapreduce.Job: Job job_1457609295138_0002 running in uber mode : false
16/03/10 03:39:38 INFO mapreduce.Job:  map 0% reduce 0%
16/03/10 03:39:43 INFO mapreduce.Job:  map 100% reduce 0%
16/03/10 03:39:49 INFO mapreduce.Job:  map 100% reduce 100%
16/03/10 03:39:49 INFO mapreduce.Job: Job job_1457609295138_0002 completed successfully
16/03/10 03:39:50 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=1122
                FILE: Number of bytes written=188013
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2332
                HDFS: Number of bytes written=526
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=3145
                Total time spent by all reduces in occupied slots (ms)=3462
                Total time spent by all map tasks (ms)=3145
                Total time spent by all reduce tasks (ms)=3462
                Total vcore-seconds taken by all map tasks=3145
                Total vcore-seconds taken by all reduce tasks=3462
                Total megabyte-seconds taken by all map tasks=3220480
                Total megabyte-seconds taken by all reduce tasks=3545088
        Map-Reduce Framework
                Map input records=22
                Map output records=22
                Map output bytes=1072
                Map output materialized bytes=1122
                Input split bytes=118
                Combine input records=0
                Combine output records=0
                Reduce input groups=21
                Reduce shuffle bytes=1122
                Reduce input records=22
                Reduce output records=21
                Spilled Records=44
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=186
                CPU time spent (ms)=1020
                Physical memory (bytes) snapshot=216825856
                Virtual memory (bytes) snapshot=725782528
                Total committed heap usage (bytes)=136908800
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=2214
        File Output Format Counters
                Bytes Written=526
1
[hadoop@master ~]$ hadoop fs -cat /flow/output/part-r-00000
13480253104     180     200     380
13502468823     102     7335    7437
13560439658     5892    400     6292
13600217502     186852  200     187052
13602846565     12      1938    1950
13660577991     9       6960    6969
13719199419     0       200     200
13726230503     2481    24681   27162
13760778710     120     200     320
13823070001     180     200     380
13826544101     0       200     200
13922314466     3008    3720    6728
13925057413     63      11058   11121
13926251106     0       200     200
13926435656     1512    200     1712
15013685858     27      3659    3686
15920133257     20      3156    3176
15989002119     3       1938    1941
18211575961     12      1527    1539
18320173382     18      9531    9549
84138413        4116    1432    5548[/mw_shl_code]






hadoop的自定义排序实现:

  • 代码:

FlowBean:

[mw_shl_code=java,true]package peerslee.hadoop.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
* 重写 WritableComparable 接口中 compareTo{} 方法
*/
public class FlowBean implements WritableComparable<FlowBean>{

        //私有域
        private String phoneNB;
        private long u_flow;
        private long d_flow;
        private long s_flow;
        
        //反射机制需要 空参构造函数
        public FlowBean() {}
        
        //构造器
        public FlowBean(String phoneNB, long u_flow, long d_flow) {
                this.phoneNB = phoneNB;
                this.u_flow = u_flow;
                this.d_flow = d_flow;
                this.s_flow = u_flow + d_flow;
        }


        public String getPhoneNB() {
                return phoneNB;
        }

        public void setPhoneNB(String phoneNB) {
                this.phoneNB = phoneNB;
        }

        public long getU_flow() {
                return u_flow;
        }

        public void setU_flow(long u_flow) {
                this.u_flow = u_flow;
        }

        public long getD_flow() {
                return d_flow;
        }

        public void setD_flow(long d_flow) {
                this.d_flow = d_flow;
        }

        public long getS_flow() {
                return s_flow;
        }

        public void setS_flow(long s_flow) {
                this.s_flow = s_flow;
        }

        //将对象序列化到流中
        @Override
        public void write(DataOutput out) throws IOException {
               
                //序列化
                out.writeUTF(phoneNB);
                out.writeLong(u_flow);
                out.writeLong(d_flow);
                out.writeLong(s_flow);
               
        }

        //将数据流反序列话成对象
        @Override
        public void readFields(DataInput in) throws IOException {
               
                //反序列化
                //注意数据类型要和序列化时保持一致,怎么读进去怎么读出来
                phoneNB = in.readUTF();
                u_flow = in.readLong();
                d_flow = in.readLong();
                s_flow = in.readLong();
        }

        //重写 toString 方法
        @Override
        public String toString() {
                return "" + phoneNB + " " + u_flow + "\t" + d_flow + "\t" + s_flow;
        }

        @Override
        public int compareTo(FlowBean o) {
                /*
                 * 原始是 第一个数 > 第二个数 返回 1 否则 返回 -1 这样是升序
                 * 我们改成自己需要的降序
                 */
               
                return s_flow > o.getS_flow() ? -1 : 1;
        }
}
[/mw_shl_code]

SortMR:

[mw_shl_code=java,true]package peerslee.hadoop.flow;

/**
* 对FlowSum 的输出结果按总流量进行降序排序
*/

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortMR {
        
        //静态内部类 来完成map 和 reduce
        
        //mapper
        public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
               
                @Override
                protected void map(LongWritable key, Text value,Context context)
                                                throws IOException, InterruptedException {
                        //读取
                        String line = value.toString();
                        
                        //截取
                        String fields[] = StringUtils.split(line, "\t");
                        
                        //封装并输出
                        String phoneNB = fields[0];
                        long u_flow = Long.parseLong(fields[1]);
                        long d_flow = Long.parseLong(fields[2]);
                        
                        context.write(new FlowBean(phoneNB, u_flow, d_flow), NullWritable.get());
                        
                }
        }
        
        //reducer
        public static class SortReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {
        
                @Override
                protected void reduce(FlowBean key, Iterable<NullWritable> value,Context context)
                                                throws IOException, InterruptedException {
                        //将key 输出
                        context.write(key, NullWritable.get());
                }
        }
        
        //runner
        public static void main(String[] args) throws Exception {
                //得到job
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);
               
                //设置 jar、mapperclass、reduceclass、outputkeyclass、outputvalueclass
                job.setJarByClass(SortMR.class);
                job.setMapperClass(SortMapper.class);
                job.setReducerClass(SortReducer.class);
                job.setOutputKeyClass(FlowBean.class);
                job.setOutputValueClass(NullWritable.class);
               
                //inputpath、outputpath
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
               
                //waitForCompletion
                System.exit(job.waitForCompletion(true)?0:1);
        }
        
        
        
}
[/mw_shl_code]

  • 测试:


[mw_shl_code=shell,true][hadoop@master ~]$ hadoop jar flow.jar peerslee.hadoop.flow.SortMR  /flow/output/part-r-00000 /flow/sort_output
16/03/11 05:57:24 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/11 05:57:25 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/11 05:57:25 INFO input.FileInputFormat: Total input paths to process : 1
16/03/11 05:57:25 INFO mapreduce.JobSubmitter: number of splits:1
16/03/11 05:57:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457700775861_0002
16/03/11 05:57:26 INFO impl.YarnClientImpl: Submitted application application_1457700775861_0002
16/03/11 05:57:26 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457700775861_0002/
16/03/11 05:57:26 INFO mapreduce.Job: Running job: job_1457700775861_0002
16/03/11 05:57:34 INFO mapreduce.Job: Job job_1457700775861_0002 running in uber mode : false
16/03/11 05:57:34 INFO mapreduce.Job:  map 0% reduce 0%
16/03/11 05:57:42 INFO mapreduce.Job:  map 100% reduce 0%
16/03/11 05:57:48 INFO mapreduce.Job:  map 100% reduce 100%
16/03/11 05:57:49 INFO mapreduce.Job: Job job_1457700775861_0002 completed successfully
16/03/11 05:57:49 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=822
                FILE: Number of bytes written=186803
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=634
                HDFS: Number of bytes written=526
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4789
                Total time spent by all reduces in occupied slots (ms)=3885
                Total time spent by all map tasks (ms)=4789
                Total time spent by all reduce tasks (ms)=3885
                Total vcore-seconds taken by all map tasks=4789
                Total vcore-seconds taken by all reduce tasks=3885
                Total megabyte-seconds taken by all map tasks=4903936
                Total megabyte-seconds taken by all reduce tasks=3978240
        Map-Reduce Framework
                Map input records=21
                Map output records=21
                Map output bytes=774
                Map output materialized bytes=822
                Input split bytes=108
                Combine input records=0
                Combine output records=0
                Reduce input groups=21
                Reduce shuffle bytes=822
                Reduce input records=21
                Reduce output records=21
                Spilled Records=42
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=237
                CPU time spent (ms)=1370
                Physical memory (bytes) snapshot=217374720
                Virtual memory (bytes) snapshot=725647360
                Total committed heap usage (bytes)=136908800
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=526
        File Output Format Counters
                Bytes Written=526
[hadoop@master ~]$ hadoop fs -cat  /flow/sort_output/part-r-00000
13600217502 186852      200     187052
13726230503 2481        24681   27162
13925057413 63  11058   11121
18320173382 18  9531    9549
13502468823 102 7335    7437
13660577991 9   6960    6969
13922314466 3008        3720    6728
13560439658 5892        400     6292
84138413 4116   1432    5548
15013685858 27  3659    3686
15920133257 20  3156    3176
13602846565 12  1938    1950
15989002119 3   1938    1941
13926435656 1512        200     1712
18211575961 12  1527    1539
13480253104 180 200     380
13823070001 180 200     380
13760778710 120 200     320
13826544101 0   200     200
13926251106 0   200     200
13719199419 0   200     200[/mw_shl_code]





mr程序中自定义分组的实现:

  • 代码:


AreaPartioner:

[mw_shl_code=java,true]package peerslee.hadoop.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

/*
* 继承partitioner 定义 属于自己的分组方式
*/
public class AreaPartioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

        /*
         * 手机归属地字典(HashMap 可以快速存取 key-value)
         */
        public static HashMap<String, Integer> areaMap = new HashMap<>();
        
        static{
                areaMap.put("135", 0);
                areaMap.put("136", 1);
                areaMap.put("137", 2);
                areaMap.put("138", 3);
                areaMap.put("139", 4);
        }
        
        /*
         * 从key中截取出手机号,通过查询手机号归属地,返回不同的组号(partition number)
         */
        @Override
        public int getPartition(KEY key, VALUE value, int numPartition) {
               
                //HashMap 通过key get value
                int areaNum = areaMap.get(key.toString().substring(0,3))==null?5:areaMap.get(key.toString().substring(0,3));
               
                return areaNum;
        }

}
[/mw_shl_code]

FlowSumArea:

[mw_shl_code=java,true]package peerslee.hadoop.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Flow;

import peerslee.hadoop.flow.FlowBean;

/**
*
* @author peerslee
*
*功能实现:对日志分析之后,通过用户手机号的归属地,将数据分到不同输入文件/part-r-XXXXX
*1.改造分区逻辑,自定义一个partitioner
*2.自定义reducer task 的并发数
*/
public class FlowSumArea {
        
        public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
                @Override
                protected void map(LongWritable key, Text value, Context context)
                                throws IOException, InterruptedException {
                        
                        //取
                        String line = value.toString();
                        
                        //截
                        String []fields = StringUtils.split(line,"\t");
                        
                        //拿
                        String phoneNB = fields[1];
                        long u_flow = Long.parseLong(fields[7]);
                        long d_flow = Long.parseLong(fields[8]);
                        
                        //装
                        context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
                        
                }
        }
        
        public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
                @Override
                protected void reduce(Text key, Iterable<FlowBean> value, Context context)
                                throws IOException, InterruptedException {
                        
                        long u_sum = 0;
                        long d_sum = 0;
                        
                        for(FlowBean bean: value) {
                                u_sum += bean.getU_flow();
                                d_sum += bean.getD_flow();
                        }
                        
                        context.write(key, new FlowBean(key.toString(),u_sum,d_sum));
                }
        }
        
        public static void main(String[] args) throws Exception {
                //job
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);
               
                //set
                job.setJarByClass(FlowSumArea.class);
                job.setMapperClass(FlowSumAreaMapper.class);
                job.setReducerClass(FlowSumAreaReducer.class);
               
                //设置我们自己的分组逻辑
                job.setPartitionerClass(AreaPartioner.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(FlowBean.class);
               
                //设置reduce task 数量
                /*
                 * 1.不设置 默认为1个
                 * 2。设置为 1 ,不会报错,结果都将写到一个文件中
                 * 3.少于我们的分组数量,报错
                 * 4.大于 6(我们的分组数)不会报错 多余的输出文件为空
                 */
                job.setNumReduceTasks(6);
               
                FileInputFormat.setInputPaths(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
               
                System.exit(job.waitForCompletion(true)?0:1);
               
        }
}
[/mw_shl_code]

  • 测试:


[mw_shl_code=shell,true][hadoop@master ~]$ hadoop jar flow.jar peerslee.hadoop.areapartition.FlowSumArea  /flow/input /flow/output3
16/03/11 07:48:46 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/11 07:48:46 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/11 07:48:47 INFO input.FileInputFormat: Total input paths to process : 1
16/03/11 07:48:47 INFO mapreduce.JobSubmitter: number of splits:1
16/03/11 07:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457700775861_0004
16/03/11 07:48:48 INFO impl.YarnClientImpl: Submitted application application_1457700775861_0004
16/03/11 07:48:48 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457700775861_0004/
16/03/11 07:48:48 INFO mapreduce.Job: Running job: job_1457700775861_0004
16/03/11 07:56:25 INFO mapreduce.Job: Job job_1457700775861_0004 running in uber mode : false
16/03/11 07:56:25 INFO mapreduce.Job:  map 0% reduce 0%
16/03/11 07:56:31 INFO mapreduce.Job:  map 100% reduce 0%
16/03/11 07:56:54 INFO mapreduce.Job:  map 100% reduce 17%
16/03/11 07:57:00 INFO mapreduce.Job:  map 100% reduce 33%
16/03/11 07:57:02 INFO mapreduce.Job:  map 100% reduce 50%
16/03/11 07:57:04 INFO mapreduce.Job:  map 100% reduce 83%
16/03/11 07:57:05 INFO mapreduce.Job:  map 100% reduce 100%
16/03/11 07:57:05 INFO mapreduce.Job: Job job_1457700775861_0004 completed successfully
16/03/11 07:57:06 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=1152
                FILE: Number of bytes written=651694
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2332
                HDFS: Number of bytes written=775
                HDFS: Number of read operations=21
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=12
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=6
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4007
                Total time spent by all reduces in occupied slots (ms)=157333
                Total time spent by all map tasks (ms)=4007
                Total time spent by all reduce tasks (ms)=157333
                Total vcore-seconds taken by all map tasks=4007
                Total vcore-seconds taken by all reduce tasks=157333
                Total megabyte-seconds taken by all map tasks=4103168
                Total megabyte-seconds taken by all reduce tasks=161108992
        Map-Reduce Framework
                Map input records=22
                Map output records=22
                Map output bytes=1072
                Map output materialized bytes=1152
                Input split bytes=118
                Combine input records=0
                Combine output records=0
                Reduce input groups=21
                Reduce shuffle bytes=1152
                Reduce input records=22
                Reduce output records=21
                Spilled Records=44
                Shuffled Maps =6
                Failed Shuffles=0
                Merged Map outputs=6
                GC time elapsed (ms)=1338
                CPU time spent (ms)=5850
                Physical memory (bytes) snapshot=505884672
                Virtual memory (bytes) snapshot=2548097024
                Total committed heap usage (bytes)=216862720
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=2214
        File Output Format Counters
                Bytes Written=775
[hadoop@master ~]$ hadoop fs -cat  /flow/output3/part-r-00000
13502468823     13502468823 102 7335    7437
13560439658     13560439658 5892        400     6292
[hadoop@master ~]$ hadoop fs -cat  /flow/output3/part-r-00001
13600217502     13600217502 186852      200     187052
13602846565     13602846565 12  1938    1950
13660577991     13660577991 9   6960    6969
[hadoop@master ~]$ hadoop fs -cat  /flow/output3/part-r-00002
13719199419     13719199419 0   200     200
13726230503     13726230503 2481        24681   27162
13760778710     13760778710 120 200     320
[hadoop@master ~]$ hadoop fs -cat  /flow/output3/part-r-00005
13480253104     13480253104 180 200     380
15013685858     15013685858 27  3659    3686
15920133257     15920133257 20  3156    3176
15989002119     15989002119 3   1938    1941
18211575961     18211575961 12  1527    1539
18320173382     18320173382 18  9531    9549
84138413        84138413 4116   1432    5548
[hadoop@master ~]$ [/mw_shl_code]






shuffle机制工作流程:

2016-03-12_212453.png

已有(1)人评论

跳转到指定楼层
xuezhiji 发表于 2016-5-4 14:22:12
不错,学习了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条