本帖最后由 pig2 于 2015-3-28 00:29 编辑
问题导读: 1、 WordCount算法编程实现。 2、矩阵乘法: 矩阵乘法原理和实现思路
MapReduce可广泛使用于各种基础算法,本文档介绍了如何在大数据场景下使用MapReduce进行一些典型基础算法的设计与实现,这些基础算法包括:最基本的WordCount算法、矩阵算法、各种关系代数运算、单词共现算法、文档倒排索引算法、PageRank网页排名算法,最后介绍一组简单的基于MapReduce的专利文献数据分析统计算法。
1 WordCount 1.1 WordCount算法编程实现 WordCount是Hadoop自带的示例程序之一,整个程序虽然简单却涵盖了MapReduce的最基本的使用方法。一般我们学习一门程序设计语言,最开始的上手程序都是”Hello World”,可是说WordCount就是学习掌握Hadoop MapReduce编程的“Hello World”。 WordCount的功能是统计输入文件也可以是输入文件夹内的多个文件)中每个单词出现的次数。几本的解决思路也很直观,就是将文本内容切分成单词,将其中相同的单词汇聚在一起,统计其数量作为该单词的出现次数输出。 WordCount的Mapper实现代码如下: //继承自Mapper接口,输入类型<Object,Text> //输出类型为<Text,IntWritable> publicstatic class TokenizerMapper extents Mapper<Object, Text, Text,IntWritable>{ //one是对整数1的 IntWritable型封装 private final static IntWritable one = newIntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Contextcontext) throws IOException,InterruptedException{ StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()){ word.set(itr.nextToken());//word存储被切割出来的单词 context.write(word,one); } }//map()函数结束 } 该类map方法调用默认的LineRecordReader,得到的value值是文本文件中的一行(以回车符号作为结束标记),key值为该行相对于文本文件首地址的偏移量。之后StringTokenizer类将value值拆分成一个个单词,并将<word, 1>作为map方法的key-value对输出,其中前面已经介绍过了IntWritable和Text是Hadoop对Int和String类的封装。 //继承自Reducer接口,输入类型<Text,IntWritable> //输出类型为<Text,IntWritable> publicstatic class IntSumReducer extents Reducer<Text, IntWritable, Text,IntWritable>{ //one是对整数1的 IntWritable型封装 private IntWritable result = new IntWritable(1);
public void reduce(Text key, Iterable<IntWritable>values, Context context) throws IOException,InterruptedException{ int sum = 0; //根据得到的<key,list{value}>计算value的和值,即单词的出现次数 for (IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key,result); }//reduce()函数结束 } Reducer函数从Map端得到形如<word,{1,1,….}> 的输出,根据这些value值累加得到该单词的出现次数并输出。 需要注意的是,在这个程序中,设置了Combiner函数。为了减轻Mapper和Reducer之间的数据传输开销,Combiner对Map输出的中间结果数据进行适当的合并,将本地Map节点输出的主键相同的键值对进行合并,再输出给Reducer,以此大量减少从Map节点到Reducer节点中间结果数据的传输量。由于本列中Combiner与Reducer类有完全相同的实现,因此,可直接使用上述的Reducer类作为Combiner使用。为此,在WordCount程序的作业配置程序中,需要有以下的设置语句: Job.setCombinerClass(IntSumReducer.class); 程序运行时的命令行代码如下: hadoopjar $HADOOP_HOME/hadoop-examles-0.20.205.0.jar \wordcount <inputPath> <outputPath> 下面用图示的方法具体解释WordCount程序的运行过程。 这里我们假设有两个输入文本文件,输入数据经过默认的LineRecordReader被分割成一行行数据,再经由map()得到<key, value>对,Map过程如图所示: 对输入文本执行map()方法得到的中间结果 map()方法输出的<key,value>对,在Map端海辉按照key值进行排序,最后交由Combiner得到Mapper部分的最终输出结果,如图所示: 经由Combiner后得到的Mapper的最终输出结果 Reducer对从Mapper端接收的数据进行排序,之后有reduce()方法进行处理,将相同主键下的所有值相家,得到新的<key,value>对作为最终的输出结果,这个过程如图所示: Reducer的最终输出结果 当WordCount进行词频统计的对象是包含大量小文本文件的目录时,鱿鱼WordCount定义的InputFormat是默认的TextInputFormat,每一个文件都至少是一个Split,当有大量小文件文本需要统计时,虽然每次Map任务只会处理少量数据,但是会有大量的Map任务,对于Hadoop性能造成了相当大的影响。所以另外一种思路就产生了,将大量的小文件合并成一个Split,减少Map任务数量。Had哦婆婆的示例程序MultiFileWordCount,该程序自定义了RecordReader。将文本内容不断读入而不分片切割成Split,其中的Mapper和Reducer部分泽与WordCount几乎一样。有兴趣的读者请参见Hadoop自带的本示例程序。
矩阵乘法 1、 矩阵乘法原理和实现思路 并行化矩阵乘法是可以用MapReduce实现的一项基础算法,最早Google公司将MapReduce引入到工业界的原因是要解决PageRank(后期见网页排名算法)中包含的大量矩阵乘法运算。在进一步介绍算法实现前。先回顾一下矩阵乘法的定义。 对于任意矩阵M和N,若矩阵M的列数等于矩阵N的行数,则记M和N的乘积P=M*N。其中mij记作矩阵M的第i行第j列的元素,njk记作矩阵N的第j行第k列的元素,则其乘积矩阵P的元素可由下式求得:
由上述公式可以看出,决定最后pik位置是 (i,k),所以可以将其作为Reducer的输入key值。而为了求出mijnjk,我们需要分别知道mij和 njk。对于mij,其所需要的属性有矩阵名称M,所在行数i,所在列数j,和其本身的数值大小mij;同样对应njk,其所需要的属性矩阵名称N,所在行数j,所在列数k,和其本身的数值大小njk。这些属性值由Mapper处理得到,基本处理思路如下。 Map函数:对于矩阵M中的每个元素mij,产生一系列的key-value对<(i,k),(M,j,mij)>,其中k=1,2…直到矩阵N的总列数;对于矩阵N中的每个元素njk,产生一系列的key-value对<(j,k),(N,j,njk)>,其中i=1,2…直到矩阵M的总列数, Reduce函数:对于每个键(i,k)相关联的值(M,j,mjk)及(N,j,njk),根据相同的j值将mij和njk分别存入不同数组中,然后将两者的第j个元素抽取出来分别相乘,最后相加,即可得到pik的值。 以下以实际示例进行解释。 设矩阵M[1,2],矩阵
,其中,i=1,j=1,2,k=1,2,3。经过map()函数之后得到如下的输出,请注意,为便于观察,横向上我们可以对Map过程的中间输出结果按照j值进行放置: <(1,1),(M,1,m11)><(1,1),(N,1,n11)><(1,1),(M,2,m12)><(1,1),(N,2,n21)> <(1,2),(M,1,m11)><(1,2),(N,1,n12)><(1,2),(M,2,m12)><(1,2),(N,2,n22)> <(1,3),(M,1,m11)><(1,3),(N,1,n13)><(1,3),(M,2,m12)><(1,3),(N,2,n23)> Reduce函数对于输入的每个key值(i,k),根据j值进行抽取出对应的元素mij和njk相乘,然后再累加。 对于key值为(1,1)的输入:m11 Xn11+ m12 X n21=1X2+2X0=2, 对于key值为(1,2)的输入:m11 Xn12+ m12 X n22=1X1+2X2=5, 对于key值为(1,3)的输入:m11 Xn13+ m12 X n23=1X3+2X4=11,
2、 矩阵乘法的MapReduce程序实现 具体实现矩阵乘法的MapReduce程序时,可以有以下的思路:一共有两个输入文本文件,分别存放矩阵M和N的元素,文件内容每一行的形式是“行坐标,列坐标\t元素数值”(\t是一个Tab键间隔符,用以区分开坐标和元素数值)。在本例中,我们并不关心矩阵实际元素的数据,所有元素数据用随机数产生,具体的测试数据可以用下面的shell脚本生成。 #!/bin/bash for i in 'seq 1 $1' do for j in 'seq 1 $2' do s=$((RANDOM%100)) echo-e "$i,$j\t$s" >>M_$1_$2 done done for i in 'seq 1 $2' do for j in 'seq 1 $3' do s=$((RANDOM%100)) echo-e "$i,$j\t$s" >>M_$2_$3 done done 注意,该脚本运行时需要三个参数,分别是矩阵M的行数和列数以及矩阵N的列数,每个矩阵的行数和列数保存在文件名中,以方便之后的MapReduce程序处理。 矩阵乘法Mapper类程序代码如下: public static class MatrixMapperextents Mapper<Object, Text, Text, Text>{ private Text map_key = new Text(); private Text map_value = new Text(); /** *执行map()函数前先由cond.get()得到main函数中提供的必要变量 *也就是从输入文件名中得到的矩阵维度信息 */ public void setup(Context context)throws IOException{ Configuration conf =context.getConfiguration(); columnN =Integer.parseInt(conf.get("columnN")); rowM =Integer.parseInt(conf.get("rowM")); } public void map(Object key, Text value,Context context) throws IOException ,InterruptedException{ //得到输入文件名,从而却分输入矩阵M和N FileSplit fileSplit =(FileSplit) context.getInputSplit(); String fileName =fileSplit.getPath().getName();
if(fileName.contains("M")){ String[] tuple =value.toString().split(","); int i =Integer.parseInt(tuple[0]); String[] tuples =tuple[1].split("\t"); int j =Integer.parseInt(tuples[0]); int Mij =Integer.parseInt(tuples[1]); for(int k = 1; k <columnN + 1; k++){ map_key.set(i+ "," +k); map_value.set("M"+ "," + j + "," +Mij); context.write(map_key,map_value); } //for循环结束 }elseif(fileName.contains("N")){ String[] tuple =value.toString().split(","); int j =Integer.parseInt(tuple[0]); String[] tuples =tuple[1].split("\t"); int k =Integer.parseInt(tuples[0]); int Njk =Integer.parseInt(tuples[1]); for(int i = 1; k <rowM + 1; i++){ map_key.set(i+ "," +k); map_value.set("N"+ "," + j + "," +Njk); context.write(map_key,map_value); } //for循环结束 } } //map()函数结束 } 对每一行数据,根据间隔符号进行分割,这样我们就得到了形如<(2,2),(M,3,7)>这种格式的key-value对,从而输出给Reducer,Reducer类的程序代码如下:
publicstatic class MatirReducer extents Reducer<Text, Text, Text, Text>{ private int sum = 0; public void setup(Context context) throwsIOException{ Configuration conf = context.getConfiguration(); columnM =Integer.parseInt(conf.get("columnM")); }
public static class IntSumReducer extentsReducer<Text, IntWritable, Text, IntWritable>{ //one是对整数1的 IntWritable型封装 private IntWritable result = new IntWritable(1);
public void reduce(Text key,Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{ //定义两个数组,根据j值,分别存放矩阵M和N中的元素 int[] M = new Int[columnM + 1]; int[] N = new Int[columnM + 1];
for (Text val : values){ String[] tuple =value.toString().split(","); if(tuple[0].equals("M")){ M[Integer.parseInt(tuple[1])]= Integer.parseInt(tuple[2]); }else if(){ N[Integer.parseInt(tuple[1])]= Integer.parseInt(tuple[2]); } } //for循环结束 for(int j = 1; j < columnM + 1;j++){ sum += M[j] * N[j]; }
context.write(key,newText(Integer.toString(sum))); sum = 0; }//reduce()函数结束 } 值得注意的是,关于rowM,cloumnM和cloumnN这三个需要共享的变量,可以通过main()函数从运行参数的文件名中得到,并通过conf.getInt()方法设置成每个Map和Reduce节点都可以全局共享的变量。Mapper和Reducer可以通过conf.get()方法读取到具体的数值。
相关帖子:
MapReduce 基础算法程序设计(1)
MapReduce 基础算法程序设计(2)
MapReduce 基础算法程序设计(3):单词共现算法 |