分享

MapReduce 基础算法程序设计(1)

本帖最后由 pig2 于 2015-3-28 00:29 编辑

问题导读:
1、 WordCount算法编程实现。
2、矩阵乘法: 矩阵乘法原理和实现思路




MapReduce可广泛使用于各种基础算法,本文档介绍了如何在大数据场景下使用MapReduce进行一些典型基础算法的设计与实现,这些基础算法包括:最基本的WordCount算法、矩阵算法、各种关系代数运算、单词共现算法、文档倒排索引算法、PageRank网页排名算法,最后介绍一组简单的基于MapReduce的专利文献数据分析统计算法。

1        WordCount
1.1  WordCount算法编程实现
WordCount是Hadoop自带的示例程序之一,整个程序虽然简单却涵盖了MapReduce的最基本的使用方法。一般我们学习一门程序设计语言,最开始的上手程序都是”Hello  World”,可是说WordCount就是学习掌握Hadoop  MapReduce编程的“Hello World”。
          WordCount的功能是统计输入文件也可以是输入文件夹内的多个文件)中每个单词出现的次数。几本的解决思路也很直观,就是将文本内容切分成单词,将其中相同的单词汇聚在一起,统计其数量作为该单词的出现次数输出。
WordCount的Mapper实现代码如下:
//继承自Mapper接口,输入类型<Object,Text>
//输出类型为<Text,IntWritable>
publicstatic class TokenizerMapper extents Mapper<Object, Text, Text,IntWritable>{
//one是对整数1的 IntWritable型封装
private final static IntWritable one = newIntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Contextcontext)
                   throws IOException,InterruptedException{
          StringTokenizer itr = new  StringTokenizer(value.toString());
          while (itr.hasMoreTokens()){
                   word.set(itr.nextToken());//word存储被切割出来的单词
                   context.write(word,one);
          }
}//map()函数结束
}
该类map方法调用默认的LineRecordReader,得到的value值是文本文件中的一行(以回车符号作为结束标记),key值为该行相对于文本文件首地址的偏移量。之后StringTokenizer类将value值拆分成一个个单词,并将<word, 1>作为map方法的key-value对输出,其中前面已经介绍过了IntWritable和Text是Hadoop对Int和String类的封装。
//继承自Reducer接口,输入类型<Text,IntWritable>
//输出类型为<Text,IntWritable>
publicstatic class IntSumReducer extents Reducer<Text, IntWritable, Text,IntWritable>{
//one是对整数1的 IntWritable型封装
private IntWritable result = new IntWritable(1);

public void  reduce(Text key, Iterable<IntWritable>values, Context context)
                   throws IOException,InterruptedException{
          int sum = 0;
          //根据得到的<key,list{value}>计算value的和值,即单词的出现次数
          for (IntWritable val : values){
                   sum += val.get();
          }
          result.set(sum);
          context.write(key,result);
}//reduce()函数结束
}
         Reducer函数从Map端得到形如<word,{1,1,….}> 的输出,根据这些value值累加得到该单词的出现次数并输出。
         需要注意的是,在这个程序中,设置了Combiner函数。为了减轻Mapper和Reducer之间的数据传输开销,Combiner对Map输出的中间结果数据进行适当的合并,将本地Map节点输出的主键相同的键值对进行合并,再输出给Reducer,以此大量减少从Map节点到Reducer节点中间结果数据的传输量。由于本列中Combiner与Reducer类有完全相同的实现,因此,可直接使用上述的Reducer类作为Combiner使用。为此,在WordCount程序的作业配置程序中,需要有以下的设置语句:
         Job.setCombinerClass(IntSumReducer.class);
程序运行时的命令行代码如下:
       hadoopjar $HADOOP_HOME/hadoop-examles-0.20.205.0.jar \wordcount  <inputPath>  <outputPath>
         下面用图示的方法具体解释WordCount程序的运行过程。
         这里我们假设有两个输入文本文件,输入数据经过默认的LineRecordReader被分割成一行行数据,再经由map()得到<key, value>对,Map过程如图所示:
                                                            对输入文本执行map()方法得到的中间结果.png
对输入文本执行map()方法得到的中间结果
         map()方法输出的<key,value>对,在Map端海辉按照key值进行排序,最后交由Combiner得到Mapper部分的最终输出结果,如图所示:
                                                            经由Combiner后得到的Mapper的最终输出结果.png
经由Combiner后得到的Mapper的最终输出结果
         Reducer对从Mapper端接收的数据进行排序,之后有reduce()方法进行处理,将相同主键下的所有值相家,得到新的<key,value>对作为最终的输出结果,这个过程如图所示:
                                                                   Reducer的最终输出结果.png
Reducer的最终输出结果
         当WordCount进行词频统计的对象是包含大量小文本文件的目录时,鱿鱼WordCount定义的InputFormat是默认的TextInputFormat,每一个文件都至少是一个Split,当有大量小文件文本需要统计时,虽然每次Map任务只会处理少量数据,但是会有大量的Map任务,对于Hadoop性能造成了相当大的影响。所以另外一种思路就产生了,将大量的小文件合并成一个Split,减少Map任务数量。Had哦婆婆的示例程序MultiFileWordCount,该程序自定义了RecordReader。将文本内容不断读入而不分片切割成Split,其中的Mapper和Reducer部分泽与WordCount几乎一样。有兴趣的读者请参见Hadoop自带的本示例程序。


矩阵乘法
1、 矩阵乘法原理和实现思路
并行化矩阵乘法是可以用MapReduce实现的一项基础算法,最早Google公司将MapReduce引入到工业界的原因是要解决PageRank(后期见网页排名算法)中包含的大量矩阵乘法运算。在进一步介绍算法实现前。先回顾一下矩阵乘法的定义。
对于任意矩阵M和N,若矩阵M的列数等于矩阵N的行数,则记M和N的乘积P=M*N。其中mij记作矩阵M的第i行第j列的元素,njk记作矩阵N的第j行第k列的元素,则其乘积矩阵P的元素可由下式求得:
                              矩阵乘法公式.png

由上述公式可以看出,决定最后pik位置是 (i,k),所以可以将其作为Reducer的输入key值。而为了求出mijnjk,我们需要分别知道mij和 njk。对于mij,其所需要的属性有矩阵名称M,所在行数i,所在列数j,和其本身的数值大小mij;同样对应njk,其所需要的属性矩阵名称N,所在行数j,所在列数k,和其本身的数值大小njk。这些属性值由Mapper处理得到,基本处理思路如下。
                  Map函数:对于矩阵M中的每个元素mij,产生一系列的key-value对<(i,k),(M,j,mij)>,其中k=1,2…直到矩阵N的总列数;对于矩阵N中的每个元素njk,产生一系列的key-value对<(j,k),(N,j,njk)>,其中i=1,2…直到矩阵M的总列数,
          Reduce函数:对于每个键(i,k)相关联的值(M,j,mjk)及(N,j,njk),根据相同的j值将mij和njk分别存入不同数组中,然后将两者的第j个元素抽取出来分别相乘,最后相加,即可得到pik的值。
          以下以实际示例进行解释。
设矩阵M[1,2],矩阵 函数公式.png ,其中,i=1,j=1,2,k=1,2,3。经过map()函数之后得到如下的输出,请注意,为便于观察,横向上我们可以对Map过程的中间输出结果按照j值进行放置:
          <(1,1),(M,1,m11)><(1,1),(N,1,n11)><(1,1),(M,2,m12)><(1,1),(N,2,n21)>
          <(1,2),(M,1,m11)><(1,2),(N,1,n12)><(1,2),(M,2,m12)><(1,2),(N,2,n22)>
          <(1,3),(M,1,m11)><(1,3),(N,1,n13)><(1,3),(M,2,m12)><(1,3),(N,2,n23)>
Reduce函数对于输入的每个key值(i,k),根据j值进行抽取出对应的元素mij和njk相乘,然后再累加。
          对于key值为(1,1)的输入:m11 Xn11+ m12 X n21=1X2+2X0=2,
          对于key值为(1,2)的输入:m11 Xn12+ m12 X n22=1X1+2X2=5,
          对于key值为(1,3)的输入:m11 Xn13+ m12 X n23=1X3+2X4=11,

2、 矩阵乘法的MapReduce程序实现
具体实现矩阵乘法的MapReduce程序时,可以有以下的思路:一共有两个输入文本文件,分别存放矩阵M和N的元素,文件内容每一行的形式是“行坐标,列坐标\t元素数值”(\t是一个Tab键间隔符,用以区分开坐标和元素数值)。在本例中,我们并不关心矩阵实际元素的数据,所有元素数据用随机数产生,具体的测试数据可以用下面的shell脚本生成。
         #!/bin/bash
for i in 'seq 1 $1'
do
         for j in 'seq 1 $2'
         do
                  s=$((RANDOM%100))
                          echo-e "$i,$j\t$s" >>M_$1_$2
         done
done
for i in 'seq 1 $2'
do
         for j in 'seq 1 $3'
         do
                  s=$((RANDOM%100))
                          echo-e "$i,$j\t$s" >>M_$2_$3
         done
done
注意,该脚本运行时需要三个参数,分别是矩阵M的行数和列数以及矩阵N的列数,每个矩阵的行数和列数保存在文件名中,以方便之后的MapReduce程序处理。
         矩阵乘法Mapper类程序代码如下:
         public static class MatrixMapperextents Mapper<Object, Text, Text, Text>{
         private Text map_key = new Text();
         private Text map_value = new Text();
         /**
          *执行map()函数前先由cond.get()得到main函数中提供的必要变量
          *也就是从输入文件名中得到的矩阵维度信息
          */
         public void setup(Context context)throws IOException{
                  Configuration conf =context.getConfiguration();
                  columnN =Integer.parseInt(conf.get("columnN"));
                  rowM =Integer.parseInt(conf.get("rowM"));
         }
         public void map(Object key, Text value,Context context) throws IOException ,InterruptedException{
                  //得到输入文件名,从而却分输入矩阵M和N
                  FileSplit fileSplit =(FileSplit) context.getInputSplit();
                  String fileName =fileSplit.getPath().getName();

                  if(fileName.contains("M")){
                          String[] tuple =value.toString().split(",");
                          int i =Integer.parseInt(tuple[0]);
                          String[] tuples =tuple[1].split("\t");
                          int j =Integer.parseInt(tuples[0]);
                          int Mij =Integer.parseInt(tuples[1]);
                          for(int k = 1; k <columnN + 1; k++){
                                   map_key.set(i+ "," +k);
                                   map_value.set("M"+ "," + j + "," +Mij);
                                   context.write(map_key,map_value);
                          } //for循环结束
                  }elseif(fileName.contains("N")){
                          String[] tuple =value.toString().split(",");
                          int j =Integer.parseInt(tuple[0]);
                          String[] tuples =tuple[1].split("\t");
                          int k =Integer.parseInt(tuples[0]);
                          int Njk =Integer.parseInt(tuples[1]);
                          for(int i = 1; k <rowM + 1; i++){
                                   map_key.set(i+ "," +k);
                                   map_value.set("N"+ "," + j + "," +Njk);
                                   context.write(map_key,map_value);
                          } //for循环结束
                  }
    } //map()函数结束
}
          对每一行数据,根据间隔符号进行分割,这样我们就得到了形如<(2,2),(M,3,7)>这种格式的key-value对,从而输出给Reducer,Reducer类的程序代码如下:

publicstatic class MatirReducer extents Reducer<Text, Text, Text, Text>{
private int sum = 0;
public void setup(Context context) throwsIOException{
          Configuration conf = context.getConfiguration();
          columnM =Integer.parseInt(conf.get("columnM"));
}

public static class IntSumReducer extentsReducer<Text, IntWritable, Text, IntWritable>{
//one是对整数1的 IntWritable型封装
private IntWritable result = new IntWritable(1);

public void reduce(Text key,Iterable<IntWritable> values, Context context)
                   throws IOException,InterruptedException{
          //定义两个数组,根据j值,分别存放矩阵M和N中的元素
          int[] M = new Int[columnM + 1];
          int[] N = new Int[columnM + 1];

          for (Text val : values){
                   String[] tuple =value.toString().split(",");
                   if(tuple[0].equals("M")){
                            M[Integer.parseInt(tuple[1])]= Integer.parseInt(tuple[2]);
                   }else if(){
                            N[Integer.parseInt(tuple[1])]= Integer.parseInt(tuple[2]);
                   }
          } //for循环结束
          for(int j = 1; j < columnM + 1;j++){
                   sum += M[j] * N[j];
          }

          context.write(key,newText(Integer.toString(sum)));
          sum = 0;
}//reduce()函数结束
}
值得注意的是,关于rowM,cloumnM和cloumnN这三个需要共享的变量,可以通过main()函数从运行参数的文件名中得到,并通过conf.getInt()方法设置成每个Map和Reduce节点都可以全局共享的变量。Mapper和Reducer可以通过conf.get()方法读取到具体的数值。



相关帖子:
MapReduce 基础算法程序设计(1)

MapReduce 基础算法程序设计(2)

MapReduce 基础算法程序设计(3):单词共现算法
欢迎加入about云群425860289432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(1)人评论

跳转到指定楼层
Redgo 发表于 2015-4-2 11:40:13
努力,努力,努力
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条