MapReduce应用案例1
本帖最后由 xuanxufeng 于 2015-8-25 21:19 编辑问题导读
1.各个部门的总工资如何进行问题分析的?
2.各个部门的总工资处理流程是怎样的?
3.个部门的人数和平均工资又是如何得到的?
4.个部门的人数和平均工资代码如何实现?
static/image/hrline/4.gif
案例所用包全部下载:链接: http://pan.baidu.com/s/1sjNyDIX 密码: **** Hidden Message *****
1、环境说明部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放Hadoop等组件运行包。因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下创建/app目录,并修改该目录拥有者为shiyanlou(chown –R shiyanlou:shiyanlou /app)。Hadoop搭建环境:l虚拟机操作系统: CentOS6.664位,单核,1G内存lJDK:1.7.0_55 64位lHadoop:1.1.22、准备测试数据测试数据包括两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:dept文件内容:10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp文件内容:7369,SMITH,CLERK,7902,17-12月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12月-81,950,,30
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
7934,MILLER,CLERK,7782,23-1月-82,1300,,10
在/home/shiyanlou/install-pack/class6目录可以找到这两个文件,把这两个文件上传到HDFS中/class6/input目录中,执行如下命令:cd /home/shiyanlou/install-pack/class6
hadoop fs -mkdir -p /class6/input
hadoop fs -copyFromLocal dept /class6/input
hadoop fs -copyFromLocal emp /class6/input
hadoop fs -ls /class6/input
3、应用案例3.1 测试例子1:求各个部门的总工资3.1.1 问题分析MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join 等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join 在处理多个小表关联大表时非常有用 。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。3.1.2 处理流程图3.1.3 测试代码Q1SumDeptSalary.java代码(vi编辑代码是不能存在中文):
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q1SumDeptSalary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
// 对部门文件字段进行拆分并缓存到deptMap中
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在部门名称
deptMap.put(deptIdName.split(","), deptIdName.split(","));
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
kv = value.toString().split(",");
// map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资
if (deptMap.containsKey(kv)) {
if (null != kv && !"".equals(kv.toString())) {
context.write(new Text(deptMap.get(kv.trim())), new Text(kv.trim()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 对同一部门的员工工资进行求和
long sumSalary = 0;
for (Text val : values) {
sumSalary += Long.parseLong(val.toString());
}
// 输出key为部门名称和value为该部门员工工资总和
context.write(key, new LongWritable(sumSalary));
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称、Mapper和Reduce类
Job job = new Job(getConf(), "Q1SumDeptSalary");
job.setJobName("Q1SumDeptSalary");
job.setJarByClass(Q1SumDeptSalary.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
DistributedCache.addCacheFile(new Path(otherArgs).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
System.exit(res);
}
}
3.1.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q1SumDeptSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q1SumDeptSalary.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java把编译好的代码打成jar包(如果不打成jar形式运行会提示class无法找到的错误)jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.classmv *.jar ../..rm Q1SumDept*.class3.1.5 运行并查看结果运行Q1SumDeptSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out1
运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1运行成功后,刷新CentOS HDFS中的输出路径/class6/out1目录,打开part-r-00000文件hadoop fs -ls /class6/out1hadoop fs -cat /class6/out1/part-r-00000可以看到运行结果:ACCOUNTING8750RESEARCH6775SALES94003.2 测试例子2:求各个部门的人数和平均工资3.2.1 问题分析求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。3.2.2 处理流程图3.2.3 编写代码Q2DeptNumberAveSalary.java代码:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q2DeptNumberAveSalary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
// 对部门文件字段进行拆分并缓存到deptMap中
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在部门名称
deptMap.put(deptIdName.split(","), deptIdName.split(","));
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
kv = value.toString().split(",");
// map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资
if (deptMap.containsKey(kv)) {
if (null != kv && !"".equals(kv.toString())) {
context.write(new Text(deptMap.get(kv.trim())), new Text(kv.trim()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
long sumSalary = 0;
int deptNumber = 0;
// 对同一部门的员工工资进行求和
for (Text val : values) {
sumSalary += Long.parseLong(val.toString());
deptNumber++;
}
// 输出key为部门名称和value为该部门员工工资平均值
context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称、Mapper和Reduce类
Job job = new Job(getConf(), "Q2DeptNumberAveSalary");
job.setJobName("Q2DeptNumberAveSalary");
job.setJarByClass(Q2DeptNumberAveSalary.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式类
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
DistributedCache.addCacheFile(new Path(otherArgs).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);
System.exit(res);
}
}
3.2.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q2DeptNumberAveSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q2DeptNumberAveSalary.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q2DeptNumberAveSalary.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.classmv *.jar ../..rm Q2DeptNum*.class3.2.5 运行并查看结果运行Q2DeptNumberAveSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out2
运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2运行成功后,刷新CentOS HDFS中的输出路径/class6/out2目录hadoop fs -ls /class6/out2hadoop fs -cat /class6/out2/part-r-00000打开part-r-00000文件,可以看到运行结果:ACCOUNTINGDept Number:3,Ave Salary:2916RESEARCHDept Number:3,Ave Salary:2258SALESDept Number:6,Ave Salary:15663.3 测试例子
3:求每个部门最早进入公司的员工姓名3.3.1 问题分析求每个部门最早进入公司员工姓名,需要得到各部门所有员工的进入公司日期,通过比较获取最早进入公司员工姓名。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后Mapper阶段抽取出key为部门名称(利用缓存部门数据把部门编号对应为部门名称),value为员工姓名和进入公司日期,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工+进入公司日期的列表,最后在Reduce中按照部门归组,遍历部门所有员工,找出最早进入公司的员工并输出。
3.3.2 处理流程图
3.3.3 编写代码
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q3DeptEarliestEmp extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在部门名称
deptMap.put(deptIdName.split(","), deptIdName.split(","));
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
kv = value.toString().split(",");
// map join: 在map阶段过滤掉不需要的数据
// 输出key为部门名称和value为员工姓名+","+员工进入公司日期
if (deptMap.containsKey(kv)) {
if (null != kv && !"".equals(kv.toString())) {
context.write(new Text(deptMap.get(kv.trim())), new Text(kv.trim() + "," + kv.trim()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 员工姓名和进入公司日期
String empName = null;
String empEnterDate = null;
// 设置日期转换格式和最早进入公司的员工、日期
DateFormat df = new SimpleDateFormat("dd-MM月-yy");
Date earliestDate = new Date();
String earliestEmp = null;
// 遍历该部门下所有员工,得到最早进入公司的员工信息
for (Text val : values) {
empName = val.toString().split(",");
empEnterDate = val.toString().split(",").toString().trim();
try {
System.out.println(df.parse(empEnterDate));
if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {
earliestDate = df.parse(empEnterDate);
earliestEmp = empName;
}
} catch (ParseException e) {
e.printStackTrace();
}
}
// 输出key为部门名称和value为该部门最早进入公司员工
context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate)));
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q3DeptEarliestEmp");
job.setJobName("Q3DeptEarliestEmp");
// 设置Mapper和Reduce类
job.setJarByClass(Q3DeptEarliestEmp.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式类
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第三个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
DistributedCache.addCacheFile(new Path(otherArgs).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);
System.exit(res);
}
}
3.3.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q3DeptEarliestEmp.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q3DeptEarliestEmp.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q3DeptEarliestEmp.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q3DeptEarliestEmp.jar ./Q3DeptEar*.classmv *.jar ../..rm Q3DeptEar*.class3.3.5 运行并查看结果运行Q3DeptEarliestEmp时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out3
运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out3运行成功后,刷新CentOS HDFS中的输出路径/class6/out3目录hadoop fs -ls /class6/out3hadoop fs -cat /class6/out3/part-r-00000打开part-r-00000文件,可以看到运行结果:ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:1981-06-09RESEARCHThe earliest emp of dept:SMITH, Enter date:1980-12-17SALESThe earliest emp of dept:ALLEN, Enter date:1981-02-203.4 测试例子4:求各个城市的员工的总工资3.4.1 问题分析求各个城市员工的总工资,需要得到各个城市所有员工的工资,通过对各个城市所有员工工资求和得到总工资。首先和测试例子1类似在Mapper的Setup阶段缓存部门对应所在城市数据,然后在Mapper阶段抽取出key为城市名称(利用缓存数据把部门编号对应为所在城市名称),value为员工工资,接着在Shuffle阶段把传过来的数据处理为城市名称对应该城市所有员工工资,最后在Reduce中按照城市归组,遍历城市所有员工,求出工资总数并输出。3.4.2 处理流程图3.4.3 编写代码
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q4SumCitySalary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在城市名称
deptMap.put(deptIdName.split(","), deptIdName.split(","));
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
kv = value.toString().split(",");
// map join: 在map阶段过滤掉不需要的数据,输出key为城市名称和value为员工工资
if (deptMap.containsKey(kv)) {
if (null != kv && !"".equals(kv.toString())) {
context.write(new Text(deptMap.get(kv.trim())), new Text(kv.trim()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 对同一城市的员工工资进行求和
long sumSalary = 0;
for (Text val : values) {
sumSalary += Long.parseLong(val.toString());
}
// 输出key为城市名称和value为该城市工资总和
context.write(key, new LongWritable(sumSalary));
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q4SumCitySalary");
job.setJobName("Q4SumCitySalary");
// 设置Mapper和Reduce类
job.setJarByClass(Q4SumCitySalary.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式类
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
DistributedCache.addCacheFile(new Path(otherArgs).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);
System.exit(res);
}
}
3.4.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q4SumCitySalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q4SumCitySalary.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q4SumCitySalary.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q4SumCitySalary.jar ./Q4SumCity*.classmv *.jar ../..rm Q4SumCity*.class3.4.5 运行并查看结果运行Q4SumCitySalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out4
运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q4SumCitySalary.jar Q4SumCitySalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out4运行成功后,刷新CentOS HDFS中的输出路径/class6/out4目录hadoop fs -ls /class6/out4hadoop fs -cat /class6/out4/part-r-00000打开part-r-00000文件,可以看到运行结果:CHICAGO9400DALLAS 6775NEW YORK 87503.5 测试例子5:列出工资比上司高的员工姓名及其工资3.5.1 问题分析求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为员工编号、value为"M,该员工工资",员工对应经理表数据key为经理编号、value为"E,该员工姓名,该员工工资";然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。3.5.2 处理流程图3.5.3 编写代码
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q5EarnMoreThanManager extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
String[] kv = value.toString().split(",");
// 输出经理表数据,其中key为员工编号和value为M+该员工工资
context.write(new Text(kv.toString()), new Text("M," + kv));
// 输出员工对应经理表数据,其中key为经理编号和value为(E,该员工姓名,该员工工资)
if (null != kv && !"".equals(kv.toString())) {
context.write(new Text(kv.toString()), new Text("E," + kv + "," + kv));
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 定义员工姓名、工资和存放部门员工Map
String empName;
long empSalary = 0;
HashMap<String, Long> empMap = new HashMap<String, Long>();
// 定义经理工资变量
long mgrSalary = 0;
for (Text val : values) {
if (val.toString().startsWith("E")) {
// 当是员工标示时,获取该员工对应的姓名和工资并放入Map中
empName = val.toString().split(",");
empSalary = Long.parseLong(val.toString().split(","));
empMap.put(empName, empSalary);
} else {
// 当时经理标志时,获取该经理工资
mgrSalary = Long.parseLong(val.toString().split(","));
}
}
// 遍历该经理下属,比较员工与经理工资高低,输出工资高于经理的员工
for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) {
if (entry.getValue() > mgrSalary) {
context.write(new Text(entry.getKey()), new Text("" + entry.getValue()));
}
}
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q5EarnMoreThanManager");
job.setJobName("Q5EarnMoreThanManager");
// 设置Mapper和Reduce类
job.setJarByClass(Q5EarnMoreThanManager.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式类
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);
System.exit(res);
}
}
3.5.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q5EarnMoreThanManager.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q5EarnMoreThanManager.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q5EarnMoreThanManager.jar ./Q5EarnMore*.classmv *.jar ../..rm Q5EarnMore*.class3.5.5 运行并查看结果运行Q5EarnMoreThanManager运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out5
运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5运行成功后,刷新CentOS HDFS中的输出路径/class6/out5目录hadoop fs -ls /class6/out5hadoop fs -cat /class6/out5/part-r-00000打开part-r-00000文件,可以看到运行结果:FORD30003.6测试例子6:列出工资比公司平均工资要高的员工姓名及其工资3.6.1 问题分析求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,从而能先求出平均工资,然后进行比较得出结果。在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为0、value为"该员工姓名 ,员工工资";然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。3.6.2 处理流程图3.6.3 编写代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q6HigherThanAveSalary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
String[] kv = value.toString().split(",");
// 获取所有员工数据,其中key为0和value为该员工工资
context.write(new IntWritable(0), new Text(kv));
// 获取所有员工数据,其中key为0和value为(该员工姓名 ,员工工资)
context.write(new IntWritable(1), new Text(kv + "," + kv));
}
}
public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {
// 定义员工工资、员工数和平均工资
private long allSalary = 0;
private int allEmpCount = 0;
private long aveSalary = 0;
// 定义员工工资变量
private long empSalary = 0;
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
if (0 == key.get()) {
// 获取所有员工工资和员工数
allSalary += Long.parseLong(val.toString());
allEmpCount++;
System.out.println("allEmpCount = " + allEmpCount);
} else if (1 == key.get()) {
if (aveSalary == 0) {
aveSalary = allSalary / allEmpCount;
context.write(new Text("Average Salary = "), new Text("" + aveSalary));
context.write(new Text("Following employees have salarys higher than Average:"), new Text(""));
}
// 获取员工的平均工资
System.out.println("Employee salary = " + val.toString());
aveSalary = allSalary / allEmpCount;
// 比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资
empSalary = Long.parseLong(val.toString().split(","));
if (empSalary > aveSalary) {
context.write(new Text(val.toString().split(",")), new Text("" + empSalary));
}
}
}
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q6HigherThanAveSalary");
job.setJobName("Q6HigherThanAveSalary");
// 设置Mapper和Reduce类
job.setJarByClass(Q6HigherThanAveSalary.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 必须设置Reduce任务数为1 # -D mapred.reduce.tasks = 1
// 这是该作业设置的核心,这样才能够保证各reduce是串行的
job.setNumReduceTasks(1);
// 设置输出格式类
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
// 设置输出键和值类型
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);
System.exit(res);
}
}
3.6.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q6HigherThanAveSalary.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q6HigherThanAveSalary.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q6HigherThanAveSalary.jar ./Q6HigherThan*.classmv *.jar ../..rm Q6HigherThan*.class3.6.5 运行并查看结果运行Q6HigherThanAveSalary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out6
运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6运行成功后,刷新CentOS HDFS中的输出路径/class6/out6目录hadoop fs -ls /class6/out6hadoop fs -cat /class6/out6/part-r-00000打开part-r-00000文件,可以看到运行结果:Average Salary = 2077Following employees have salarys higher than Average: FORD3000CLARK2450KING 5000JONES2975BLAKE28503.7 测试例子7:列出名字以J开头的员工姓名及其所属部门名称3.7.1 问题分析求名字以J开头的员工姓名机器所属部门名称,只需判断员工姓名是否以J开头。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段判断员工姓名是否以J开头,如果是抽取出员工姓名和员工所在部门编号,利用缓存部门数据把部门编号对应为部门名称,转换后输出结果。3.7.2 处理流程图3.7.3 编写代码
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q7NameDeptOfStartJ extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
// 对部门文件字段进行拆分并缓存到deptMap中
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在部门名称
deptMap.put(deptIdName.split(","), deptIdName.split(","));
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
kv = value.toString().split(",");
// 输出员工姓名为J开头的员工信息,key为员工姓名和value为员工所在部门名称
if (kv.toString().trim().startsWith("J")) {
context.write(new Text(kv.trim()), new Text(deptMap.get(kv.trim())));
}
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q7NameDeptOfStartJ");
job.setJobName("Q7NameDeptOfStartJ");
// 设置Mapper和Reduce类
job.setJarByClass(Q7NameDeptOfStartJ.class);
job.setMapperClass(MapClass.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式类
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
DistributedCache.addCacheFile(new Path(otherArgs).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args);
System.exit(res);
}
}
3.7.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q7NameDeptOfStartJ.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q7NameDeptOfStartJ.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q7NameDeptOfStartJ.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q7NameDeptOfStartJ.jar ./Q7NameDept*.classmv *.jar ../..rm Q7NameDept*.class3.7.5 运行并查看结果运行Q7NameDeptOfStartJ时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out7运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out7运行成功后,刷新CentOS HDFS中的输出路径/class6/out7目录hadoop fs -ls /class6/out7hadoop fs -cat /class6/out7/part-r-00000打开part-r-00000文件,可以看到运行结果:JAMESSALESJONESRESEARCH3.8 测试例子8:列出工资最高的头三名员工姓名及其工资3.8.1 问题分析求工资最高的头三名员工姓名及工资,可以通过冒泡法得到。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为0值、value为"员工姓名,员工工资";最后在Reduce中通过冒泡法遍历所有员工,比较员工工资多少,求出前三名。3.8.2 处理流程图3.8.3 编写代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q8SalaryTop3Salary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
String[] kv = value.toString().split(",");
// 输出key为0和value为员工姓名+","+员工工资
context.write(new IntWritable(0), new Text(kv.trim() + "," + kv.trim()));
}
}
public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 定义工资前三员工姓名
String empName;
String firstEmpName = "";
String secondEmpName = "";
String thirdEmpName = "";
// 定义工资前三工资
long empSalary = 0;
long firstEmpSalary = 0;
long secondEmpSalary = 0;
long thirdEmpSalary = 0;
// 通过冒泡法遍历所有员工,比较员工工资多少,求出前三名
for (Text val : values) {
empName = val.toString().split(",");
empSalary = Long.parseLong(val.toString().split(","));
if(empSalary > firstEmpSalary) {
thirdEmpName = secondEmpName;
thirdEmpSalary = secondEmpSalary;
secondEmpName = firstEmpName;
secondEmpSalary = firstEmpSalary;
firstEmpName = empName;
firstEmpSalary = empSalary;
} else if (empSalary > secondEmpSalary) {
thirdEmpName = secondEmpName;
thirdEmpSalary = secondEmpSalary;
secondEmpName = empName;
secondEmpSalary = empSalary;
} else if (empSalary > thirdEmpSalary) {
thirdEmpName = empName;
thirdEmpSalary = empSalary;
}
}
// 输出工资前三名信息
context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:" + firstEmpSalary));
context.write(new Text( "Second employee name:" + secondEmpName), new Text("Salary:" + secondEmpSalary));
context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:" + thirdEmpSalary));
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q8SalaryTop3Salary");
job.setJobName("Q8SalaryTop3Salary");
// 设置Mapper和Reduce类
job.setJarByClass(Q8SalaryTop3Salary.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputValueClass(Text.class);
// 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);
System.exit(res);
}
}
3.8.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q8SalaryTop3Salary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q8SalaryTop3Salary.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q8SalaryTop3Salary.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q8SalaryTop3Salary.jar ./Q8SalaryTop3*.classmv *.jar ../..rm Q8SalaryTop3*.class3.8.5 运行并查看结果运行Q8SalaryTop3Salary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out8运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8运行成功后,刷新CentOS HDFS中的输出路径/class6/out8目录hadoop fs -ls /class6/out8hadoop fs -cat /class6/out8/part-r-00000打开part-r-00000文件,可以看到运行结果:First employee name:KING Salary:5000Second employee name:FORD Salary:3000Third employee name:JONESSalary:29753.9测试例子9:将全体员工按照总收入(工资+提成)从高到低排列3.9.1 问题分析求全体员工总收入降序排列,获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在本作业中Map的key只有0值,故能实现对所有数据进行排序。3.9.2 处理流程图3.9.3 编写代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q9EmpSalarySort extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
String[] kv = value.toString().split(",");
// 输出key为员工所有工资和value为员工姓名
int empAllSalary = "".equals(kv) ? Integer.parseInt(kv) : Integer.parseInt(kv) + Integer.parseInt(kv);
context.write(new IntWritable(empAllSalary), new Text(kv));
}
}
/**
* 递减排序算法
*/
public static class DecreaseComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称
Job job = new Job(getConf(), "Q9EmpSalarySort");
job.setJobName("Q9EmpSalarySort");
// 设置Mapper和Reduce类
job.setJarByClass(Q9EmpSalarySort.class);
job.setMapperClass(MapClass.class);
// 设置输出格式类
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(DecreaseComparator.class);
// 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args);
System.exit(res);
}
}
3.9.4 编译并打包代码进入/app/hadoop-1.1.2/myclass/class6目录中新建Q9EmpSalarySort.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q9EmpSalarySort.java文件)cd /app/hadoop-1.1.2/myclass/class6vi Q9EmpSalarySort.java编译代码javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误jar cvf ./Q9EmpSalarySort.jar ./Q9EmpSalary*.classmv *.jar ../..rm Q9EmpSalary*.class3.9.5 运行并查看结果运行Q9EmpSalarySort运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:l员工数据路径:hdfs://hadoop:9000/class6/input/empl输出路径:hdfs://hadoop:9000/class6/out9运行如下命令:cd /app/hadoop-1.1.2hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9运行成功后,刷新CentOS HDFS中的输出路径/class6/out9目录hadoop fs -ls /class6/out9hadoop fs -cat /class6/out9/part-r-00000打开part-r-00000文件,可以看到运行结果:5000 KING3000 FORD2975 JONES2850 BLAKE......
下一篇:MapReduce应用案例2
http://www.aboutyun.com/thread-14932-1-1.html
谢谢分享 学习了。 111111111111111111111111111
非常感謝分享
正需要,謝分享!
楼主辛苦了,非常感谢分享! 谢谢分享这么好的资源
谢谢分享
受教了,学习学习!