分享

初识MapReduce需要解决的问题及通过maven编写MapReduce

pig2 发表于 2014-2-28 22:24:32 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 22816
本帖最后由 pig2 于 2014-2-28 22:27 编辑
阅读本文可以带着下面问题:
1.编写mapreduce如何在多台pc机上执行
2.map有几个参数,分别是什么含义?
3.reduce的作用是什么?


------------------------------------------------------------------------

我们刚接触mapreduce是否有这样的问题。hadoop是分布式的,而且每一台pc都存放有数据。我们编写mapreduce到底运行在什么地方?
如果你不产生这个问题,可能是因为对hadoop还没有真正的认识。

接下来我们产生这个问题该如何解决这个问题?
这里引用“敏小小”的博客使用maven构建hadoop开发环境

程序员的工作就是实现map和reduce函数,其它并行编程中的种种问题,比如分布式存储,工作调度,负载平衡等都有mapreduce框架去处理。

所以我们编写的mapreduce在其他pc机上执行,不是我们所关心的,而是有mapreduce来完成的。
这里同样引用:Hadoop简介(1):什么是Map/Reduce

上面都是从理论上来说明什么是MapReduce,那么咱们在MapReduce产生的过程和代码的角度来理解这个问题。
如果想统计下过去10年计算机论文出现最多的几个单词,看看大家都在研究些什么,那收集好论文后,该怎么办呢?

方法一:
我可以写一个小程序,把所有论文按顺序遍历一遍,统计每一个遇到的单词的出现次数,最后就可以知道哪几个单词最热门了。 这种方法在数据集比较小时,是非常有效的,而且实现最简单,用来解决这个问题很合适。

方法二:
写一个多线程程序,并发遍历论文。
这个问题理论上是可以高度并发的,因为统计一个文件时不会影响统计另一个文件。当我们的机器是多核或者多处理器,方法二肯定比方法一高效。但是写一个多线程程序要比方法一困难多了,我们必须自己同步共享数据,比如要防止两个线程重复统计文件。

方法三:
把作业交给多个计算机去完成。
我们可以使用方法一的程序,部署到N台机器上去,然后把论文集分成N份,一台机器跑一个作业。这个方法跑得足够快,但是部署起来很麻烦,我们要人工把程序copy到别的机器,要人工把论文集分开,最痛苦的是还要把N个运行结果进行整合(当然我们也可以再写一个程序)。

方法四:
让MapReduce来帮帮我们吧!
MapReduce本质上就是方法三,但是如何拆分文件集,如何copy程序,如何整合结果这些都是框架定义好的。我们只要定义好这个任务(用户程序),其它都交给MapReduce。

------------------------------------------------------------------------------------------------------------------------------------------------

编写Java程序的方式有很多种,这里通过maven来编写MapReduce

创建maven工程并加入hadoop依赖

我们选用maven来管理工程,用自己喜爱的m2eclipse插件在eclipse里创建或在命令行里创建一个工程。在pom.xml里加入hadoop依赖。
  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-core</artifactId>
  4. <version>0.20.2</version>
  5. </dependency>
  6. <repositories>
  7. <repository>
  8.   <id>cloudera</id>
  9.   <url>https://repository.cloudera.com/content/groups/public</url>
  10. </repository>
  11. </repositories>
复制代码
运行mvn eclipse:eclipse命令后,将工程导入eclipse,可以看到以下相关的依赖。

Ok,现在开始我们第一个MapReduce程序,用这个程序实现字数统计功能。




概述

一个简单的MapReduce程序需要三样东西

1.实现Mapper,处理输入的对,输出中间结果

2.实现Reduce,对中间结果进行运算,输出最终结果

3.在main方法里定义运行作业,定义一个job,在这里控制job如何运行等。

编写Map类
  1. public  class WordCountMapper
  2.        extends Mapper <Object, Text, Text, IntWritable>{
  3.     private final static IntWritable one = new IntWritable(1);
  4.     private Text word = new Text();
  5.     public void map(Object key, Text value, Context context
  6.                     ) throws IOException, InterruptedException {
  7.       StringTokenizer itr = new StringTokenizer(value.toString());
  8.       while (itr.hasMoreTokens()) {
  9.         word.set(itr.nextToken());
  10.         context.write(word, one);
  11.       }
  12.     }
  13.   }
复制代码
Mapper接口是一个泛型,有4个形式的参数类型,分别指定map函数的输入键,输入值,输出键,输出值。

就上面的示例来说,输入键没有用到(实际代表行在文本中格的位置,没有这方面的需要,所以忽略),输入值是一样文本,输出键为单词,输出值为整数代表单词出现的次数。

需要注意的是Hadoop规定了自己的一套可用于网络序列优化的基本类型,而不是使用内置的java类型,这些都在org.apache.hadoop.io包中定义,上面使用的Text类型相当于java的String类型,IntWritable类型相当于java的Integer类型。除此之外,看不到任何分布式编程的细节,一切都是那么的简单。

编写Reduce类
  1. public class WordCountReducer extends
  2.                 Reducer <Text, IntWritable, Text, IntWritable> {
  3.         private IntWritable result = new IntWritable();
  4.         public void reduce(Text key, Iterable values, Context context)
  5.                         throws IOException, InterruptedException {
  6.                 int sum = 0;
  7.                 for (IntWritable val : values) {
  8.                         sum += val.get();
  9.                 }
  10.                 result.set(sum);
  11.                 context.write(key, result);
  12.         }
  13. }
复制代码
同样,Reducer接口的四个形式参数类型指定了reduce函数的输入和输出类型。在上面的例子中,输入键是单词,输入值是单词出现的次数,将单词出现的次数进行叠加,输出单词和单词总数。


定义job
  1. public class WordCount {
  2.   public static void main(String[] args) throws Exception {
  3.      Configuration conf = new Configuration();
  4.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  5.          if (otherArgs.length != 2) {
  6.            System.err.println("Usage: wordcount  ");
  7.            System.exit(2);
  8.          }
  9.          /**创建一个job,起个名字以便跟踪查看任务执行情况**/
  10.          Job job = new Job(conf, "word count");
  11.          /**当在hadoop集群上运行作业时,需要把代码打包成一个jar文件(hadoop会在集群分发这个文件),通过job的setJarByClass设置一个类,hadoop根据这个类找到所在的jar文件**/
  12.         job.setJarByClass(WordCount.class);
  13.         
  14.         /**设置要使用的map、combiner、reduce类型**/
  15.         job.setMapperClass(WordCountMapper.class);   
  16.         job.setCombinerClass(WordCountReducer.class);
  17.         job.setReducerClass(WordCountReducer.class);
  18.         
  19.        /**设置map和reduce函数的输入类型,这里没有代码是因为我们使用默认的TextInputFormat,针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value>: 对,key 是行在文件中的位置,value 是文件中的一行**/
  20.         /**设置map和reduce函数的输出键和输出值类型**/
  21.         job.setOutputKeyClass(Text.class);
  22.         job.setOutputValueClass(IntWritable.class);
  23.         /**设置输入和输出路径**/
  24.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  25.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  26.        /**提交作业并等待它完成**/
  27.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  28.       }
  29. }
复制代码
基本上完成一个MapReduce程序就这么简单,复杂的在于job的配置有着复杂的属性参数,如文件分割策略、排序策略、map输出内存缓冲区的大小、工作线程数量等,深入理解掌握这些参数才能使自己的MapReduce程序在集群环境中运行的最优。





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条