分享

MapReduce初级案例(3):使用MapReduce实现平均成绩

hyj 发表于 2014-3-3 22:25:51 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 31 107966
本帖最后由 hyj 于 2014-3-3 22:37 编辑

当我们看到这个例子的时候,我们是否想过:
mapreduce是否可以完成我们传统开发中经常遇到的一些任务。例如排序、平均数、批量word转换等。它和我们传统开发有什么不同。
那么我们可以带着下面问题来阅读:
1.mapreduce是如何求平均值的?
2.map在求平均值的作用是什么?
3.reduce在求平均值的作用是什么?

一、简介:
"平均成绩"主要目的还是在重温经典"WordCount"例子,可以说是在基础上的微变化版,该实例主要就是实现一个计算学生平均成绩的例子。

二、实例描述

对输入文件中数据进行就算学生平均成绩。输入文件中的每行内容均为一个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名,第二个代表其平均成绩。

    样本输入:

    1)math:
  1. 张三    88
  2. 李四    99
  3. 王五    66
  4. 赵六    77
复制代码
2)chinese :
  1. 张三    78
  2. 李四    89
  3. 王五    96
  4. 赵六    67
复制代码
3)english:
  1. 张三    80
  2. 李四    82
  3. 王五    84
  4. 赵六    86
复制代码
样本输出:
  1. 张三    82
  2. 李四    90
  3. 王五    82
  4. 赵六    76
复制代码

三、设计思路


计算学生平均成绩是一个仿"WordCount"例子,用来重温一下开发MapReduce程序的流程。程序包括两部分的内容:Map部分和Reduce部分,分别实现了map和reduce的功能。

Map处理的是一个纯文本文件,文件中存放的数据时每一行表示一个学生的姓名和他相应一科成绩。Mapper处理的数据是由InputFormat分解过的数据集,其中InputFormat的作用是将数据集切割成小数据集InputSplit,每一个InputSlit将由一个Mapper负责处理。此外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成<key,value>对提供给了map函数。InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用LineRecordReader将InputSplit解析成<key,value>对,key是行在文本中的位置,value是文件中的一行。

Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputFormat输出。

Mapper最终处理的结果对<key,value>,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个Reducer上。Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有Reducer的上下文。Reduce的结果由Reducer.Context的write方法输出到文件中。


四、程序代码


程序代码如下所示:
  1. package com.hebut.mr;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import java.util.StringTokenizer;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.LongWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. public class Score {
  19.     public static class Map extends
  20.             Mapper<LongWritable, Text, Text, IntWritable> {
  21.         // 实现map函数
  22.         public void map(LongWritable key, Text value, Context context)
  23.                 throws IOException, InterruptedException {
  24.             // 将输入的纯文本文件的数据转化成String
  25.             String line = value.toString();
  26.             // 将输入的数据首先按行进行分割
  27.             StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
  28.             // 分别对每一行进行处理
  29.             while (tokenizerArticle.hasMoreElements()) {
  30.                 // 每行按空格划分
  31.                 StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
  32.                 String strName = tokenizerLine.nextToken();// 学生姓名部分
  33.                 String strScore = tokenizerLine.nextToken();// 成绩部分
  34.                 Text name = new Text(strName);
  35.                 int scoreInt = Integer.parseInt(strScore);
  36.                 // 输出姓名和成绩
  37.                 context.write(name, new IntWritable(scoreInt));
  38.             }
  39.         }
  40.     }
  41.     public static class Reduce extends
  42.             Reducer<Text, IntWritable, Text, IntWritable> {
  43.         // 实现reduce函数
  44.         public void reduce(Text key, Iterable<IntWritable> values,
  45.                 Context context) throws IOException, InterruptedException {
  46.             int sum = 0;
  47.             int count = 0;
  48.             Iterator<IntWritable> iterator = values.iterator();
  49.             while (iterator.hasNext()) {
  50.                 sum += iterator.next().get();// 计算总分
  51.                 count++;// 统计总的科目数
  52.             }
  53.             int average = (int) sum / count;// 计算平均成绩
  54.             context.write(key, new IntWritable(average));
  55.         }
  56.     }
  57.     public static void main(String[] args) throws Exception {
  58.         Configuration conf = new Configuration();
  59.         // 这句话很关键
  60.         conf.set("mapred.job.tracker", "192.168.1.2:9001");
  61.         String[] ioArgs = new String[] { "score_in", "score_out" };
  62.         String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
  63.         if (otherArgs.length != 2) {
  64.             System.err.println("Usage: Score Average <in> <out>");
  65.             System.exit(2);
  66.         }
  67.         Job job = new Job(conf, "Score Average");
  68.         job.setJarByClass(Score.class);
  69.         // 设置Map、Combine和Reduce处理类
  70.         job.setMapperClass(Map.class);
  71.         job.setCombinerClass(Reduce.class);
  72.         job.setReducerClass(Reduce.class);
  73.         // 设置输出类型
  74.         job.setOutputKeyClass(Text.class);
  75.         job.setOutputValueClass(IntWritable.class);
  76.         // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
  77.         job.setInputFormatClass(TextInputFormat.class);
  78.         // 提供一个RecordWriter的实现,负责数据输出
  79.         job.setOutputFormatClass(TextOutputFormat.class);
  80.         // 设置输入和输出目录
  81.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  82.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  83.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  84.     }
  85. }
复制代码
四、代码结果

1)准备测试数据
    通过Eclipse下面的"DFS Locations"在"/user/hadoop"目录下创建输入文件"score_in"文件夹(备注:"score_out"不需要创建。)如图3.4-1所示,已经成功创建。

1.png              2.png
图3.4-1 创建"score_in"                                                       图3.4.2 上传三门分数

    然后在本地建立三个txt文件,通过Eclipse上传到"/user/hadoop/score_in"文件夹中,三个txt文件的内容如"实例描述"那三个文件一样。如图3.4-2所示,成功上传之后。
    备注:文本文件的编码为"UTF-8",默认为"ANSI",可以另存为时选择,不然中文会出现乱码。
    从SecureCRT远处查看"Master.Hadoop"的也能证实我们上传的三个文件。

3.png

查看三个文件的内容如图3.4-3所示:

4.png
图3.4.3 三门成绩的内容
2)查看运行结果
    这时我们右击Eclipse的"DFS Locations"中"/user/hadoop"文件夹进行刷新,这时会发现多出一个"score_out"文件夹,且里面有3个文件,然后打开双其"part-r-00000"文件,会在Eclipse中间把内容显示出来。如图3.4-4所示。

5.png
图3.4-4 运行结果



已有(33)人评论

跳转到指定楼层
nettman 发表于 2014-3-3 22:57:48
从上面可以看出map所做的同样是分割,处理的过程,属于对原材料的处理加工真正求平均值则是在reduce中
回复

使用道具 举报

pig2 发表于 2014-3-4 09:47:55
上面涉及到Iterator,这里需要补充一下

迭代器(Iterator)

  迭代器是一种设计模式,它是一个对象,它可以遍历并选择序列中的对象,而开发人员不需要了解该序列的底层结构。迭代器通常被称为“轻量级”对象,因为创建它的代价小。

  Java中的Iterator功能比较简单,并且只能单向移动:

  (1) 使用方法iterator()要求容器返回一个Iterator。第一次调用Iterator的next()方法时,它返回序列的第一个元素。注意:iterator()方法是java.lang.Iterable接口,被Collection继承。

  (2) 使用next()获得序列中的下一个元素。

  (3) 使用hasNext()检查序列中是否还有元素。

  (4) 使用remove()将迭代器新返回的元素删除。

  Iterator是Java迭代器最简单的实现,为List设计的ListIterator具有更多的功能,它可以从两个方向遍历List,也可以从List中插入和删除元素。

迭代器应用:
list l = new ArrayList();
l.add("aa");
l.add("bb");
l.add("cc");
for (Iterator iter = l.iterator(); iter.hasNext();) {
  String str = (String)iter.next();
  System.out.println(str);
}
/*迭代器用于while循环
Iterator iter = l.iterator();
while(iter.hasNext()){
  String str = (String) iter.next();
  System.out.println(str);
}
*/

回复

使用道具 举报

472827074@qq.co 发表于 2014-4-14 13:59:51
顶起来顶起来顶起来顶起来顶起来
回复

使用道具 举报

hadoop求索 发表于 2014-4-26 10:17:56
在写map的过程中,hadoop默认的以行读取数据,楼主这里没有必要按行分割把?
回复

使用道具 举报

mehao 发表于 2014-6-25 16:56:13
有问题,楼主,帮忙解答一下,就是你上面的例子,上面是输入文件是三个,分别存放着不同科目的学生的成绩,问题是当用一个文件作为输入,将学生的多科目多成绩存进去,简而言之就是之前的三个文件合并成一个文件作为输入时,输出的时候,显示的结果就有问题,,,,这是为什么????
回复

使用道具 举报

howtodown 发表于 2014-6-25 17:09:36
mehao 发表于 2014-6-25 16:56
有问题,楼主,帮忙解答一下,就是你上面的例子,上面是输入文件是三个,分别存放着不同科目的学生的成绩, ...

这个程序的逻辑是按照不同科目,文件不同来设计的。如果你一旦合并,程序没有改变,还是按照原先的逻辑,所以肯定会不一样的。
回复

使用道具 举报

mehao 发表于 2014-6-26 10:43:30
本帖最后由 howtodown 于 2014-6-26 13:02 编辑
howtodown 发表于 2014-6-25 17:09
这个程序的逻辑是按照不同科目,文件不同来设计的。如果你一旦合并,程序没有改变,还是按照原先的逻辑, ...

这个例子不是测试平均值的吗;我求某个人的成绩的平均分,这个时候,我的目的很简单,只想知道某个人的平均分是多少,我不管它是哪个科目,从上述写的程序也符合要求,最后同样输出的也是某个人的平均分,这和不同科目之间没有关系吧。
貌似不是这样的吧,不妨做个测试:测试一:以一个编码格式为ANSI的txt文件,里面存放有 如图所示,将此文件作为 input 输入文件,进行MR求平均值,结束后查看output内容,结果显示平均值均正确;

测试二:以一个 编码格式为utf-8的txt文件,里面存放有数据(和【测试一】一样的内容),将北文件作为intput输入文件,和【测试一】一样的测试步骤运行MR程序,结束后,查看output内容,结果显示错误;

这是为什么?是不是说由于编码的影响导致 文件内容map切数据时,不能合并,如上图中的,字符C,在map中,存在两个key值,导致给reduce的时候合并最后输出就是两个C,对应两个value值;请高手帮帮忙指导下??刚学不怎么懂;谢谢了

编码格式为ansi的txt

编码格式为ansi的txt


回复

使用道具 举报

howtodown 发表于 2014-6-26 13:09:39
mehao 发表于 2014-6-26 10:43
这个例子不是测试平均值的吗;我求某个人的成绩的平均分,这个时候,我的目的很简单,只想知道某个人的平 ...
我不管它是哪个科目,从上述写的程序也符合要求,最后同样输出的也是某个人的平均分,这和不同科目之间没有关系吧。
这个有关系,建议你先实践,然后在讨论问题。一个人的平均成绩:总成绩/科目,你只有总成绩,没有科目,就不会有平均成绩。

回复

使用道具 举报

hyj 发表于 2014-6-26 13:17:22
mehao 发表于 2014-6-26 10:43
这个例子不是测试平均值的吗;我求某个人的成绩的平均分,这个时候,我的目的很简单,只想知道某个人的平 ...

小伙子,研究的很深,字符的变动,可能会引起内容的变化,至于会不会造成切分成功,你的前后两个帖子不一致。你原先的问题是什么原因造成的。
回复

使用道具 举报

1234下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条