本帖最后由 PeersLee 于 2016-3-23 16:02 编辑
问题导读:
1、MR程序 如何进行 本地模式运行?
2、MR程序 如何进行 集群模式运行?
3、怎样区分 字节流 与 字符流?
4、如何用 MapperReduce 计算框架实现 流量求和功能?
5、如何实现hadoop的自定义排序?
6、如何实现mr程序中自定义分组?
7、shuffle机制是如何工作的?
解决方案:
环境:
centos 6.5 + jdk 1.7 + hadoop 2.4.1
MR程序的本地模式:
Linux(已进行 hadoop 2.4.1 的位分布式安装,下文不再注释该条件) 上的 eclipse 直接 进行本地模式运行:
不添加yarn相关的配置,也会提交给localjobrunner执行的
(1)确保 yarn 的 jar 包已经在你的 User Library 中:
(2)需要修改的代码:
(3)运行 WCRunner.java 的 main 函数:
(4)查看输出结果:
---------------------------------------------------------------------------------------
linux 的 eclipse 操作Hdfs 进行本地模式运行:
(1)启动 HDFS -> start-dfs.sh
(2)修改代码如下:
(3)运行 在 HDFS 上检查计算结果
(4)当出现这个异常时
要将两个配置文件copy 到src 目录下就可以
MR程序的本地模式:
打成jar 包 上传到 HDFS 上 进行集群模式运行:
MapReduce 学习笔记(1)
---------------------------------------------------------------------------------
linux 的eclipse 进行集群模式运行:
(1)在工程src目录下加入 mapred-site.xml 和 yarn-site.xml
(2)工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数 conf.set("mapreduce.job.jar","wc.jar");
字符流(处理String,Long对象) -序列化-> 字节流(二进制对象):
字节流 FileInputStream FileOutputStream
字符流 FileReader FileWriter
字符流处理的单元为2个字节的Unicode字符,分别操作字符、字符数组或字符串,而字节流处理单元为1个字节,操作字节和字节数组。所以字符流是由Java虚拟机将字节转化为2个字节的Unicode字符为单位的字符而成的,所以它对多国语言支持性比较好!如果是音频文件、图片、歌曲,就用字节流好点,如果是关系到中文(文本)的,用字符流好点.所有文件的储存是都是字节(byte)的储存,在磁盘上保留的并不是文件的字符而是先把字符编码成字节,再储存这些字节到磁盘。在读取文件(特别是文本文件)时,也是一个字节一个字节地读取以形成字节序列.字节流可用于任何类型的对象,包括二进制对象,而字符流只能处理字符或者字符串字节流提供了处理任何类型的IO操作的功能,但它不能直接处理Unicode字符,而字符流就可以。字节流转换成字符流可以用 InputSteamReader OutputStreamWriter
Mapreduce框架实现 流量求和 功能:
Mapper 模块 需要从数据中抽取我们需要的有用的数据段,Reducer 模块 得到 Mapper 截取完的数据块进行求和计算得出每个用的上行流量与下行流量和流量和。这里Mapper 的输出数据类型需要我们自己去封装一个 JavaBean 来包装我们使用的数据,这里涉及到hadoop 的序列化接口 Writable。
FlowBean:
[mw_shl_code=java,true]package peerslee.hadoop.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
*
* @author peerslee
*
*FlowBean 是我们自己定义的数据类型,要在hadoop 的所有节点上进行传递,首先要遵循hadoop 的序列化机制,
*所以要实现hadoop 的序列化接口 Writable(这是接口,很重要)
*
*hadoop 的序列化机制不会传递对象的继承结构(jdk 自带的序列化机制会携带对象的继承机制)
*/
public class FlowBean implements Writable{
//私有域
private String phoneNB;
private long u_flow;
private long d_flow;
private long s_flow;
//反射机制需要 空参构造函数
public FlowBean() {}
//构造器
public FlowBean(String phoneNB, long u_flow, long d_flow) {
this.phoneNB = phoneNB;
this.u_flow = u_flow;
this.d_flow = d_flow;
this.s_flow = u_flow + d_flow;
}
public String getPhoneNB() {
return phoneNB;
}
public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}
public long getU_flow() {
return u_flow;
}
public void setU_flow(long u_flow) {
this.u_flow = u_flow;
}
public long getD_flow() {
return d_flow;
}
public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}
public long getS_flow() {
return s_flow;
}
public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}
//将对象序列化到流中
@Override
public void write(DataOutput out) throws IOException {
//序列化
out.writeUTF(phoneNB);
out.writeLong(u_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}
//将数据流反序列话成对象
@Override
public void readFields(DataInput in) throws IOException {
//反序列化
//注意数据类型要和序列化时保持一致,怎么读进去怎么读出来
phoneNB = in.readUTF();
u_flow = in.readLong();
d_flow = in.readLong();
s_flow = in.readLong();
}
//重写 toString 方法
@Override
public String toString() {
return "" + u_flow + "\t" + d_flow + "\t" + s_flow;
}
}
[/mw_shl_code]
Mapper:
[mw_shl_code=java,true]package peerslee.hadoop.flowsum;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* mapper 的业务逻辑:
* 按行读取数据,对每一行数据进行截取,得到我们需要的数据段,封装成v-k 模式发送个 reduce
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//按行读取数据
String line = value.toString();
//进行截取
String []fields = StringUtils.split(line, "\t");
//得到我们需要的数据
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);
//封装数据并输出
context.write(new Text(phoneNB), new FlowBean(phoneNB, u_flow, d_flow));
}
}
[/mw_shl_code]
Reducer:
[mw_shl_code=java,true]package peerslee.hadoop.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* reducer 的业务逻辑:
* 框架每传递一组数据<手机号,flowbean 的数组>,调用一次Reduce 函数
* 该框架要对 value进行遍历、加和、输出
*/
public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> value,Context context)
throws IOException, InterruptedException {
long u_flow_sum = 0;
long d_flow_sum = 0;
//累加求和
for(FlowBean bean : value) {
u_flow_sum += bean.getU_flow();
d_flow_sum += bean.getD_flow();
}
//封装输出
context.write(key, new FlowBean(key.toString(), u_flow_sum, d_flow_sum));
}
}
[/mw_shl_code]
Runner:
[mw_shl_code=java,true]package peerslee.hadoop.flowsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowSumRunner extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
//job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//setJar
job.setJarByClass(FlowSumRunner.class);
//MapperClass & ReducerClass
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
//OutputKey - Outputvalue
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//InputPaths - OutputPaths
// 这个包 hadoop.mapreduce.lib
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 1 : 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
System.out.println(res);
}
}
[/mw_shl_code]
[mw_shl_code=bash,true][hadoop@master ~]$ hadoop fs -put HTTP_20130313143750.dat /flow/input
[hadoop@master ~]$ hadoop fs -rm -r /flow/output
16/03/10 03:39:17 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /flow/output
[hadoop@master ~]$ hadoop jar flow.jar peerslee.hadoop.flowsum.FlowSumRunner /flow/input /flow/output
16/03/10 03:39:30 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/10 03:39:30 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/10 03:39:30 INFO input.FileInputFormat: Total input paths to process : 1
16/03/10 03:39:31 INFO mapreduce.JobSubmitter: number of splits:1
16/03/10 03:39:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457609295138_0002
16/03/10 03:39:31 INFO impl.YarnClientImpl: Submitted application application_1457609295138_0002
16/03/10 03:39:31 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457609295138_0002/
16/03/10 03:39:31 INFO mapreduce.Job: Running job: job_1457609295138_0002
16/03/10 03:39:38 INFO mapreduce.Job: Job job_1457609295138_0002 running in uber mode : false
16/03/10 03:39:38 INFO mapreduce.Job: map 0% reduce 0%
16/03/10 03:39:43 INFO mapreduce.Job: map 100% reduce 0%
16/03/10 03:39:49 INFO mapreduce.Job: map 100% reduce 100%
16/03/10 03:39:49 INFO mapreduce.Job: Job job_1457609295138_0002 completed successfully
16/03/10 03:39:50 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1122
FILE: Number of bytes written=188013
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2332
HDFS: Number of bytes written=526
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3145
Total time spent by all reduces in occupied slots (ms)=3462
Total time spent by all map tasks (ms)=3145
Total time spent by all reduce tasks (ms)=3462
Total vcore-seconds taken by all map tasks=3145
Total vcore-seconds taken by all reduce tasks=3462
Total megabyte-seconds taken by all map tasks=3220480
Total megabyte-seconds taken by all reduce tasks=3545088
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1122
Input split bytes=118
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1122
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=186
CPU time spent (ms)=1020
Physical memory (bytes) snapshot=216825856
Virtual memory (bytes) snapshot=725782528
Total committed heap usage (bytes)=136908800
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2214
File Output Format Counters
Bytes Written=526
1
[hadoop@master ~]$ hadoop fs -cat /flow/output/part-r-00000
13480253104 180 200 380
13502468823 102 7335 7437
13560439658 5892 400 6292
13600217502 186852 200 187052
13602846565 12 1938 1950
13660577991 9 6960 6969
13719199419 0 200 200
13726230503 2481 24681 27162
13760778710 120 200 320
13823070001 180 200 380
13826544101 0 200 200
13922314466 3008 3720 6728
13925057413 63 11058 11121
13926251106 0 200 200
13926435656 1512 200 1712
15013685858 27 3659 3686
15920133257 20 3156 3176
15989002119 3 1938 1941
18211575961 12 1527 1539
18320173382 18 9531 9549
84138413 4116 1432 5548[/mw_shl_code]
hadoop的自定义排序实现:
FlowBean:
[mw_shl_code=java,true]package peerslee.hadoop.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/*
* 重写 WritableComparable 接口中 compareTo{} 方法
*/
public class FlowBean implements WritableComparable<FlowBean>{
//私有域
private String phoneNB;
private long u_flow;
private long d_flow;
private long s_flow;
//反射机制需要 空参构造函数
public FlowBean() {}
//构造器
public FlowBean(String phoneNB, long u_flow, long d_flow) {
this.phoneNB = phoneNB;
this.u_flow = u_flow;
this.d_flow = d_flow;
this.s_flow = u_flow + d_flow;
}
public String getPhoneNB() {
return phoneNB;
}
public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}
public long getU_flow() {
return u_flow;
}
public void setU_flow(long u_flow) {
this.u_flow = u_flow;
}
public long getD_flow() {
return d_flow;
}
public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}
public long getS_flow() {
return s_flow;
}
public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}
//将对象序列化到流中
@Override
public void write(DataOutput out) throws IOException {
//序列化
out.writeUTF(phoneNB);
out.writeLong(u_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}
//将数据流反序列话成对象
@Override
public void readFields(DataInput in) throws IOException {
//反序列化
//注意数据类型要和序列化时保持一致,怎么读进去怎么读出来
phoneNB = in.readUTF();
u_flow = in.readLong();
d_flow = in.readLong();
s_flow = in.readLong();
}
//重写 toString 方法
@Override
public String toString() {
return "" + phoneNB + " " + u_flow + "\t" + d_flow + "\t" + s_flow;
}
@Override
public int compareTo(FlowBean o) {
/*
* 原始是 第一个数 > 第二个数 返回 1 否则 返回 -1 这样是升序
* 我们改成自己需要的降序
*/
return s_flow > o.getS_flow() ? -1 : 1;
}
}
[/mw_shl_code]
SortMR:
[mw_shl_code=java,true]package peerslee.hadoop.flow;
/**
* 对FlowSum 的输出结果按总流量进行降序排序
*/
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortMR {
//静态内部类 来完成map 和 reduce
//mapper
public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//读取
String line = value.toString();
//截取
String fields[] = StringUtils.split(line, "\t");
//封装并输出
String phoneNB = fields[0];
long u_flow = Long.parseLong(fields[1]);
long d_flow = Long.parseLong(fields[2]);
context.write(new FlowBean(phoneNB, u_flow, d_flow), NullWritable.get());
}
}
//reducer
public static class SortReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> value,Context context)
throws IOException, InterruptedException {
//将key 输出
context.write(key, NullWritable.get());
}
}
//runner
public static void main(String[] args) throws Exception {
//得到job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置 jar、mapperclass、reduceclass、outputkeyclass、outputvalueclass
job.setJarByClass(SortMR.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
//inputpath、outputpath
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//waitForCompletion
System.exit(job.waitForCompletion(true)?0:1);
}
}
[/mw_shl_code]
[mw_shl_code=shell,true][hadoop@master ~]$ hadoop jar flow.jar peerslee.hadoop.flow.SortMR /flow/output/part-r-00000 /flow/sort_output
16/03/11 05:57:24 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/11 05:57:25 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/11 05:57:25 INFO input.FileInputFormat: Total input paths to process : 1
16/03/11 05:57:25 INFO mapreduce.JobSubmitter: number of splits:1
16/03/11 05:57:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457700775861_0002
16/03/11 05:57:26 INFO impl.YarnClientImpl: Submitted application application_1457700775861_0002
16/03/11 05:57:26 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457700775861_0002/
16/03/11 05:57:26 INFO mapreduce.Job: Running job: job_1457700775861_0002
16/03/11 05:57:34 INFO mapreduce.Job: Job job_1457700775861_0002 running in uber mode : false
16/03/11 05:57:34 INFO mapreduce.Job: map 0% reduce 0%
16/03/11 05:57:42 INFO mapreduce.Job: map 100% reduce 0%
16/03/11 05:57:48 INFO mapreduce.Job: map 100% reduce 100%
16/03/11 05:57:49 INFO mapreduce.Job: Job job_1457700775861_0002 completed successfully
16/03/11 05:57:49 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=822
FILE: Number of bytes written=186803
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=634
HDFS: Number of bytes written=526
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=4789
Total time spent by all reduces in occupied slots (ms)=3885
Total time spent by all map tasks (ms)=4789
Total time spent by all reduce tasks (ms)=3885
Total vcore-seconds taken by all map tasks=4789
Total vcore-seconds taken by all reduce tasks=3885
Total megabyte-seconds taken by all map tasks=4903936
Total megabyte-seconds taken by all reduce tasks=3978240
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=774
Map output materialized bytes=822
Input split bytes=108
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=822
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=237
CPU time spent (ms)=1370
Physical memory (bytes) snapshot=217374720
Virtual memory (bytes) snapshot=725647360
Total committed heap usage (bytes)=136908800
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=526
File Output Format Counters
Bytes Written=526
[hadoop@master ~]$ hadoop fs -cat /flow/sort_output/part-r-00000
13600217502 186852 200 187052
13726230503 2481 24681 27162
13925057413 63 11058 11121
18320173382 18 9531 9549
13502468823 102 7335 7437
13660577991 9 6960 6969
13922314466 3008 3720 6728
13560439658 5892 400 6292
84138413 4116 1432 5548
15013685858 27 3659 3686
15920133257 20 3156 3176
13602846565 12 1938 1950
15989002119 3 1938 1941
13926435656 1512 200 1712
18211575961 12 1527 1539
13480253104 180 200 380
13823070001 180 200 380
13760778710 120 200 320
13826544101 0 200 200
13926251106 0 200 200
13719199419 0 200 200[/mw_shl_code]
mr程序中自定义分组的实现:
AreaPartioner:
[mw_shl_code=java,true]package peerslee.hadoop.areapartition;
import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;
/*
* 继承partitioner 定义 属于自己的分组方式
*/
public class AreaPartioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
/*
* 手机归属地字典(HashMap 可以快速存取 key-value)
*/
public static HashMap<String, Integer> areaMap = new HashMap<>();
static{
areaMap.put("135", 0);
areaMap.put("136", 1);
areaMap.put("137", 2);
areaMap.put("138", 3);
areaMap.put("139", 4);
}
/*
* 从key中截取出手机号,通过查询手机号归属地,返回不同的组号(partition number)
*/
@Override
public int getPartition(KEY key, VALUE value, int numPartition) {
//HashMap 通过key get value
int areaNum = areaMap.get(key.toString().substring(0,3))==null?5:areaMap.get(key.toString().substring(0,3));
return areaNum;
}
}
[/mw_shl_code]
FlowSumArea:
[mw_shl_code=java,true]package peerslee.hadoop.areapartition;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Flow;
import peerslee.hadoop.flow.FlowBean;
/**
*
* @author peerslee
*
*功能实现:对日志分析之后,通过用户手机号的归属地,将数据分到不同输入文件/part-r-XXXXX
*1.改造分区逻辑,自定义一个partitioner
*2.自定义reducer task 的并发数
*/
public class FlowSumArea {
public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//取
String line = value.toString();
//截
String []fields = StringUtils.split(line,"\t");
//拿
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);
//装
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
}
}
public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> value, Context context)
throws IOException, InterruptedException {
long u_sum = 0;
long d_sum = 0;
for(FlowBean bean: value) {
u_sum += bean.getU_flow();
d_sum += bean.getD_flow();
}
context.write(key, new FlowBean(key.toString(),u_sum,d_sum));
}
}
public static void main(String[] args) throws Exception {
//job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//set
job.setJarByClass(FlowSumArea.class);
job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);
//设置我们自己的分组逻辑
job.setPartitionerClass(AreaPartioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置reduce task 数量
/*
* 1.不设置 默认为1个
* 2。设置为 1 ,不会报错,结果都将写到一个文件中
* 3.少于我们的分组数量,报错
* 4.大于 6(我们的分组数)不会报错 多余的输出文件为空
*/
job.setNumReduceTasks(6);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
[/mw_shl_code]
[mw_shl_code=shell,true][hadoop@master ~]$ hadoop jar flow.jar peerslee.hadoop.areapartition.FlowSumArea /flow/input /flow/output3
16/03/11 07:48:46 INFO client.RMProxy: Connecting to ResourceManager at master/172.24.2.101:8032
16/03/11 07:48:46 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/11 07:48:47 INFO input.FileInputFormat: Total input paths to process : 1
16/03/11 07:48:47 INFO mapreduce.JobSubmitter: number of splits:1
16/03/11 07:48:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1457700775861_0004
16/03/11 07:48:48 INFO impl.YarnClientImpl: Submitted application application_1457700775861_0004
16/03/11 07:48:48 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1457700775861_0004/
16/03/11 07:48:48 INFO mapreduce.Job: Running job: job_1457700775861_0004
16/03/11 07:56:25 INFO mapreduce.Job: Job job_1457700775861_0004 running in uber mode : false
16/03/11 07:56:25 INFO mapreduce.Job: map 0% reduce 0%
16/03/11 07:56:31 INFO mapreduce.Job: map 100% reduce 0%
16/03/11 07:56:54 INFO mapreduce.Job: map 100% reduce 17%
16/03/11 07:57:00 INFO mapreduce.Job: map 100% reduce 33%
16/03/11 07:57:02 INFO mapreduce.Job: map 100% reduce 50%
16/03/11 07:57:04 INFO mapreduce.Job: map 100% reduce 83%
16/03/11 07:57:05 INFO mapreduce.Job: map 100% reduce 100%
16/03/11 07:57:05 INFO mapreduce.Job: Job job_1457700775861_0004 completed successfully
16/03/11 07:57:06 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1152
FILE: Number of bytes written=651694
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2332
HDFS: Number of bytes written=775
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=12
Job Counters
Launched map tasks=1
Launched reduce tasks=6
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=4007
Total time spent by all reduces in occupied slots (ms)=157333
Total time spent by all map tasks (ms)=4007
Total time spent by all reduce tasks (ms)=157333
Total vcore-seconds taken by all map tasks=4007
Total vcore-seconds taken by all reduce tasks=157333
Total megabyte-seconds taken by all map tasks=4103168
Total megabyte-seconds taken by all reduce tasks=161108992
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1152
Input split bytes=118
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1152
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =6
Failed Shuffles=0
Merged Map outputs=6
GC time elapsed (ms)=1338
CPU time spent (ms)=5850
Physical memory (bytes) snapshot=505884672
Virtual memory (bytes) snapshot=2548097024
Total committed heap usage (bytes)=216862720
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2214
File Output Format Counters
Bytes Written=775
[hadoop@master ~]$ hadoop fs -cat /flow/output3/part-r-00000
13502468823 13502468823 102 7335 7437
13560439658 13560439658 5892 400 6292
[hadoop@master ~]$ hadoop fs -cat /flow/output3/part-r-00001
13600217502 13600217502 186852 200 187052
13602846565 13602846565 12 1938 1950
13660577991 13660577991 9 6960 6969
[hadoop@master ~]$ hadoop fs -cat /flow/output3/part-r-00002
13719199419 13719199419 0 200 200
13726230503 13726230503 2481 24681 27162
13760778710 13760778710 120 200 320
[hadoop@master ~]$ hadoop fs -cat /flow/output3/part-r-00005
13480253104 13480253104 180 200 380
15013685858 15013685858 27 3659 3686
15920133257 15920133257 20 3156 3176
15989002119 15989002119 3 1938 1941
18211575961 18211575961 12 1527 1539
18320173382 18320173382 18 9531 9549
84138413 84138413 4116 1432 5548
[hadoop@master ~]$ [/mw_shl_code]
shuffle机制工作流程:
|
|