pergrand 发表于 2016-7-9 23:19:49

Hadoop学习之mapreduce及示例代码

本帖最后由 pergrand 于 2016-7-9 23:30 编辑

三:MapReduce 是一种分布式计算模型。Mapreduce框架有默认实现,程序员只需要覆盖map()和reduce()两个函数。MapReduce的执行流程1.Map Task (以一个入门例子的单词计数为例,两行一定行是hello word第二行是hello you中间是制表符)         1.1读取:框架调用InputFormat类的子类读取HDFS中文件数据,把文件转换为InputSplit。默认,文件的一个block对应一个InputSplit,一个InputSplit对应一个map task。             一个InputSplit中的数据会被RecordReader解析成<k1,v1>。默认,InputSplit中的一行解析成一个<k1,v1>。默认,v1表示一行的内容,k1表示偏移量。读取的结果是<0,helloword>和<10,hello you> 10 是第二行的起始偏移量,这两个是<k1,v1>         1.2map:框架调用Mapper类中的map(k1,v1)方法,接收<k1,v1>,输出<k2,v2>。有多少个<k1,v1>,map()会被执行多少次。输出<k2,v2>是<hello,1><word,1><hello,1><you,1>                   程序员可以覆盖map(),实现自己的业务逻辑。         1.3分区:框架对map的输出进行分区。分区的目的是确定哪些<k2,v2>进入哪个reduce task。默认,只有一个分区。可以手动设置(0,1,2等后面会涉及到)         1.4排序分组:框架对不同分区中的<k2,v2>进行排序、分组。             排序是按照k2进行排序。结果是<hello,1><hello,1><word,1><you,1>                   分组指的是相同k2的v2分到一个组中。分组不会减少<k2,v2>的数量。         1.5combiner:可以在map task中对<k2,{v2}>执行reduce归约。<hello,{1,1}><word,{1}><you,{1}>         1.6写入本地:框架对map的输出写入到linux本地磁盘。2.Reduce Task         2.1shuffle:框架根据map不同的分区中的数据,通过网络copy到不同的reduce节点。         2.2合并排序分组:每个reduce会把多个map传来的<k2,v2>进行合并、排序、分组。         2.3reduce:框架调用reduce(k2,v2s)。有多少个分组,就会执行多少次reduce函数。         2.4写入HDFS:框架对reduce的输出写入到HDFS中。
单词计数代码示例:
package mp.wordcount;
import java.io.IOException;
importorg.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** * *-------------------------------------------- * 原始数据 *hello    you *hello    me *--------------------- * 结果如下 *hello    2 * me       1 *you      1 * *------------------------------------------------------------- * * */public class HelloWordCountApp2 {         /**          * 驱动代码          * @param args          * @throws Exception           */         publicstatic void main(String[] args) throws Exception {                   //从命令行传入输入路径                   StringinputPath = args;                   //从命令行传入输出目录                   PathoutputDir = new Path(args);

                   Configurationconf = new Configuration();                   //表示job名称,可以自定义,一般是类名                   StringjobName = HelloWordCountApp2.class.getSimpleName();                   //把所有的相关内容都封装到job中                   Jobjob = Job.getInstance(conf, jobName);                   //打成jar运行必备代码                   job.setJarByClass(HelloWordCountApp2.class);
                   //设置输入路径                   FileInputFormat.setInputPaths(job,inputPath);                   //设置输出目录                   FileOutputFormat.setOutputPath(job,outputDir);
                   //设置自定义mapper类                   job.setMapperClass(HelloWordCountMapper.class);                   //指定k2,v2类型                   job.setMapOutputKeyClass(Text.class);                   job.setMapOutputValueClass(LongWritable.class);

                   //设置自定义reduce类                   job.setReducerClass(HelloWordCountReducer.class);                   //指定k3,v3类型                   job.setOutputKeyClass(Text.class);                   job.setOutputValueClass(LongWritable.class);
                   //提交给yarn运行,等待结束                   job.waitForCompletion(true);         }


         /**          * map过程。          * 在这里,程序员继承Mapper,覆盖map(...)方法。          * 该类在运行的时候,称作map task,是一个java进程。          *----------------------------------------------------          * map()全部执行完后,产生的<k2,v2>有4个,即<hello,1><you,1><hello,1><me,1>。          * 排序后是<hello,1><hello,1><me,1><you,1>。          * 分组后是<hello,{1,1}><me,{1}><you,{1}>。          *           *          */         publicstatic class HelloWordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{                   Textk2 = new Text();                   LongWritablev2 = new LongWritable();
                   /**                  * 前面已经有拆分完成的<k1,v1>。调用map()一次方法,就处理一个<k1,v1>对。                  *                   * 在map()方法,拆分每一行,得到每个单词,每个单词(不是每个不同的单词)的出现次数是1。                  * 构造<k2,v2>,k2表示单词,v2表示出现次数1。                  */                   @Override                   protectedvoid map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,LongWritable>.Context context)                                     throwsIOException, InterruptedException {                            //因为要对每行内容做拆分,需要调用String.split(),所以需要把Text转行成String。                            Stringline = value.toString();                            //拆分每行内容,结果是单词的数组                            String[]splited = line.split("\t");                            //循环数组,取每个单词。在for循环中构造<k2,v2>                            for(String word : splited) {                                     k2.set(word);                                     v2.set(1L);                                     //把<k2,v2>写出去,相当于调用return语句                                     context.write(k2,v2);                            }                   }         }

         /**          * reduce过程          *           * reduce端接收的是map的输出,即4个<k2,v2>,3个分组。          * 在reduce执行之前,reduce端合并、排序、分组<k2,v2>。          * 在reduce()调用之前,有3个分组,即<hello,{1,1}><me,{1}><you,{1}>          * 一次reduce()执行,处理1个分组。所以说,执行3次reduce()。          * ------------------------------------------------------------------          * reduce task执行结束后,框架会把reduce输出的<k3,v3>写入到HDFS中          *          */         publicstatic class HelloWordCountReducer extends Reducer<Text, LongWritable, Text,LongWritable>{                   LongWritablev3 = new LongWritable();
                   /**                  * k2表示每个不同的单词                  * v2s表示每个不同的单词的出现次数                  * 在reduce()中,只需要汇总v2s中的出现次数就行。                  */                   @Override                   protectedvoid reduce(Text k2, Iterable<LongWritable> v2s,                                     Reducer<Text,LongWritable, Text, LongWritable>.Context context) throws IOException,InterruptedException {                            //sum表示当前单词k2出现的总次数                            longsum = 0L;                            for(LongWritable v2 : v2s) {                                     sum+= v2.get();                            }                            //k3表示当前不同的单词,与k2含义相同
                            v3.set(sum);                            context.write(k2,v3);                   }         }}
打成jar包在hdfs中执行yarn jar jar包名 /hello /out          /hello就是要统计的单词文本,是上传到hdfs上的;/out输出路径,如果已存在可以删除,也可在代码中删除执行结果:

代码中用到序列化Hadoop的序列化格式:Writablehadoop序列化的目的是什么?mapreduce运行过程中,产生大量的磁盘io和网络io。序列化性能的差异,会对job的运行效率产生非常大的影响。因此,高效率的序列化机制可以提高效率。



部分代码类方法解析1.InputFormat里面有2个方法,一个是getSplits(),一个是createRecordReader()。   在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入。每一个InputSplit都有一个RecordReader,作用是把InputSplit中的数据解析成Record,即<k1,v1>.TextInputFormat中的RecordReader是LineRecordReader,每一行解析成一个<k1,v1>。其中,k1表示偏移量,v1表示行文本内容 FileInputFormat类中分析了getSplits()。 TextInputFormat类是MR默认的输入处理类。主要分析的是LineRecordReader。 Maper类的源代码中,有setup、cleanup、map、run四个重要的方法。
2.SequenceFileInputFormat专门处理类型是SequenceFile格式的输入文件。如果是大量的小文件作为输入文件,那么会产生大量的map task。如果把大量的小文件转换为SequenceFile格式,那么会产生非常少的maptask。如果SequenceFile使用压缩,那么maptask执行时间会更短。
job.setInputFormatClass(SequenceFileInputFormat.class);
3.CombineFileInputFormat作用是把大量的小文件交给一个map task。在这里,输入依然是小文件,但是会由非常少的map task运行。
job.setInputFormatClass(CombineSmallFilesInputFormat.class);--------------------------------------------------------------------------------------4.OutputFormat里面有个很重要的类,叫做RecordWriter。5.使用MultipleOutputs可以自定义输出的文件名。---------------------------------------------------------------------------------------6.在map task或者reduce task中使用第三方的jar包。
首先把第三方jar包上传到hdfs中,然后调用job.addArchiveToClasspath(...)
7.分区partitioiner默认的分区是1个,分区的实现是HashPartitioner。   什么时候用到分区?当需要把不同的数据按照不同的类型输出时,使用分区,例如不同的省份的安电话区号分别输出到不同的文件中。自定义分区类继承HashPartitioner例如extends HashPartitioner<Text,FlowWritable>覆盖getPartition方法(方法内根据业务实现分区)。
job.setPartitionClass(.....) job.setNumReduceTasks(...)
8.归约combiner发生在map端的reduce操作。作用是减少map端的输出,减少shuffle过程中网络传输的数据量,提高作业的执行效率。 combiner仅仅是单个map task的reduce,没有对全部map的输出做reduce。 job.setCombinerClass(....)
9.排序sort(见下面代码)两种比较方式,一种是调用k2的compareTo(...)完成比较,第二种是自定义类extendsWritableComparator job.setSortComparatorClass(....)
注意:extendsWritableComparator的子类一定要有个无参构造方法,在该构造方法中,调用父类的有2个参数的构造方法。
10.分组grouping(例如日志采集中不同主机的操作记录,可以按照主机ip分组做其他处理)   当排序逻辑与分组逻辑不一样时,就需要自定义分组。自定义类extends WritableComparatorjob.setGroupingComparatorClass(...)

对于排序 示例代码例如需求对列排序如果第一行相同按照第二行大小排序1   22   11   12   21       3要求输出结果1   11   21       32       12       2思路一将第一列数和第二列数封装到自定义的一个类TwoInt 中进行自定义排序CustomSortComparator extends WritableComparator思路二按照 k2排序方法1:import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
importorg.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;importorg.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** * 利用k2的compareTo()实现排序 * * */public class SortApp1 {         /**          * 驱动代码          * @param args          * @throws Exception           */         publicstatic void main(String[] args) throws Exception {                   //从命令行传入输入路径                   StringinputPath = args;                   //从命令行传入输出目录                   PathoutputDir = new Path(args);                   //reduce数量                   IntegernumReduceTasks = Integer.parseInt(args==null?"1":args);


                   Configurationconf = new Configuration();                   outputDir.getFileSystem(conf).delete(outputDir,true);                   //表示job名称,可以自定义,一般是类名                   StringjobName = SortApp1.class.getSimpleName();                   //把所有的相关内容都封装到job中                   Jobjob = Job.getInstance(conf, jobName);                   //打成jar运行必备代码                   job.setJarByClass(SortApp1.class);
                   //设置输入路径                   FileInputFormat.setInputPaths(job,inputPath);                   //设置输出目录                   FileOutputFormat.setOutputPath(job,outputDir);
                   //设置自定义mapper类                   job.setMapperClass(SortMapper.class);                   //指定k2,v2类型                   job.setMapOutputKeyClass(TwoInt.class);                   job.setMapOutputValueClass(NullWritable.class);
                   job.setNumReduceTasks(numReduceTasks);
                   //设置自定义reduce类                   job.setReducerClass(SortReducer.class);                   //指定k3,v3类型                   job.setOutputKeyClass(TwoInt.class);                   job.setOutputValueClass(NullWritable.class);
                   //提交给yarn运行,等待结束                   job.waitForCompletion(true);         }

         publicstatic class SortMapper extends Mapper<LongWritable, Text, TwoInt,NullWritable>{                   TwoIntk2 = new TwoInt();                   @Override                   protectedvoid map(LongWritable key, Text value,                                     Mapper<LongWritable,Text, TwoInt, NullWritable>.Context context)                                                      throwsIOException, InterruptedException {                            Stringline = value.toString();                            String[]splited = line.split("\t");
                            k2.set(Integer.parseInt(splited),Integer.parseInt(splited));                            context.write(k2,NullWritable.get());                   }         }
         publicstatic class SortReducer extends Reducer<TwoInt, NullWritable, TwoInt,NullWritable>{                   @Override                   protectedvoid reduce(TwoInt k2, Iterable<NullWritable> v2s,                                     Reducer<TwoInt,NullWritable, TwoInt, NullWritable>.Context context)                                                      throwsIOException, InterruptedException {                            context.write(k2,NullWritable.get());                   }         }

         publicstatic class TwoInt implements WritableComparable<TwoInt>{                   privateInteger first;                   privateInteger second;
                   publicTwoInt() {                            super();                   }
                   publicvoid set(Integer first, Integer second) {                            this.first= first;                            this.second= second;                   }
                   publicvoid write(DataOutput out) throws IOException {                            out.writeInt(this.first);                            out.writeInt(this.second);                   }
                   publicvoid readFields(DataInput in) throws IOException {                            this.first=in.readInt();                            this.second= in.readInt();                   }
                   publicint compareTo(TwoInt o) {                            intret1 = first.compareTo(o.getFirst());                            if(ret1==0){                                     returnsecond.compareTo(o.getSecond());                            }else{                                     returnret1;                            }                   }
                   publicInteger getFirst() {                            returnfirst;                   }
                   publicvoid setFirst(Integer first) {                            this.first= first;                   }
                   publicInteger getSecond() {                            returnsecond;                   }
                   publicvoid setSecond(Integer second) {                            this.second= second;                   }
                   @Override                   publicint hashCode() {                            finalint prime = 31;                            intresult = 1;                            result= prime * result + ((first == null) ? 0 : first.hashCode());                            result= prime * result + ((second == null) ? 0 : second.hashCode());                            returnresult;                   }
                   @Override                   publicboolean equals(Object obj) {                            if(this == obj)                                     returntrue;                            if(obj == null)                                     returnfalse;                            if(getClass() != obj.getClass())                                     returnfalse;                            TwoIntother = (TwoInt) obj;                            if(first == null) {                                     if(other.first != null)                                             returnfalse;                            }else if (!first.equals(other.first))                                     returnfalse;                            if(second == null) {                                     if(other.second != null)                                             returnfalse;                            }else if (!second.equals(other.second))                                     returnfalse;                            returntrue;                   }
                   @Override                   publicString toString() {                            returnfirst + "\t" + second;                   }
         }
}
方法2:
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
importorg.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;importorg.apache.hadoop.io.WritableComparable;importorg.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** * 自定义sort类,实现k2的排序 * * */public class SortApp2 {         /**          * 驱动代码          * @param args          * @throws Exception           */         publicstatic void main(String[] args) throws Exception {                   //从命令行传入输入路径                   StringinputPath = args;                   //从命令行传入输出目录                   PathoutputDir = new Path(args);                   //reduce数量                   IntegernumReduceTasks = Integer.parseInt(args==null?"1":args);


                   Configurationconf = new Configuration();                   outputDir.getFileSystem(conf).delete(outputDir,true);                   //表示job名称,可以自定义,一般是类名                   StringjobName = SortApp2.class.getSimpleName();                   //把所有的相关内容都封装到job中                   Jobjob = Job.getInstance(conf, jobName);                   //打成jar运行必备代码                   job.setJarByClass(SortApp2.class);
                   //设置输入路径                   FileInputFormat.setInputPaths(job,inputPath);                   //设置输出目录                   FileOutputFormat.setOutputPath(job,outputDir);
                   //设置自定义mapper类                   job.setMapperClass(SortMapper.class);                   //指定k2,v2类型                   job.setMapOutputKeyClass(TwoInt.class);                   job.setMapOutputValueClass(NullWritable.class);
                   //自定义比较类                   job.setSortComparatorClass(CustomSortComparator.class);                   job.setNumReduceTasks(numReduceTasks);
                   //设置自定义reduce类                   job.setReducerClass(SortReducer.class);                   //指定k3,v3类型                   job.setOutputKeyClass(TwoInt.class);                   job.setOutputValueClass(NullWritable.class);
                   //提交给yarn运行,等待结束                   job.waitForCompletion(true);         }

         publicstatic class SortMapper extends Mapper<LongWritable, Text, TwoInt, NullWritable>{                   TwoIntk2 = new TwoInt();                   @Override                   protectedvoid map(LongWritable key, Text value,                                     Mapper<LongWritable,Text, TwoInt, NullWritable>.Context context)                                                      throwsIOException, InterruptedException {                            Stringline = value.toString();                            String[]splited = line.split("\t");
                            k2.set(Integer.parseInt(splited),Integer.parseInt(splited));                            context.write(k2,NullWritable.get());                   }         }
         publicstatic class SortReducer extends Reducer<TwoInt, NullWritable, TwoInt,NullWritable>{                   @Override                   protectedvoid reduce(TwoInt k2, Iterable<NullWritable> v2s,                                     Reducer<TwoInt,NullWritable, TwoInt, NullWritable>.Context context)                                                      throwsIOException, InterruptedException {                            context.write(k2,NullWritable.get());                   }         }
         publicstatic class CustomSortComparator extends WritableComparator{
                   /**                  * 必须有无参构造方法,在方法内部,调用父类的含有2个形参的构造方法。                  * 父类构造方法的第二个参数为true                  */                   publicCustomSortComparator() {                            super(TwoInt.class,true);                   }
                   @Override                   publicint compare(WritableComparable a, WritableComparable b) {                            TwoIntaa = (TwoInt) a;                            TwoIntbb = (TwoInt) b;                            intret1 = aa.getFirst().compareTo(bb.getFirst());                            if(ret1==0){                                     returnaa.getSecond().compareTo(bb.getSecond());                            }else{                                     returnret1;                            }                   }         }
         publicstatic class TwoInt implements WritableComparable<TwoInt>{                   privateInteger first;                   privateInteger second;
                   publicTwoInt() {                            super();                   }
                   publicvoid set(Integer first, Integer second) {                            this.first= first;                            this.second= second;                   }
                   publicvoid write(DataOutput out) throws IOException {                            out.writeInt(this.first);                            out.writeInt(this.second);                   }
                   publicvoid readFields(DataInput in) throws IOException {                            this.first=in.readInt();                            this.second= in.readInt();                   }
                   publicint compareTo(TwoInt o) {                            return0;                   }
                   publicInteger getFirst() {                            returnfirst;                   }
                   publicvoid setFirst(Integer first) {                            this.first= first;                   }
                   publicInteger getSecond() {                            returnsecond;                   }
                   publicvoid setSecond(Integer second) {                            this.second= second;                   }
                   @Override                   publicint hashCode() {                            finalint prime = 31;                            intresult = 1;                            result= prime * result + ((first == null) ? 0 : first.hashCode());                            result= prime * result + ((second == null) ? 0 : second.hashCode());                            returnresult;                   }
                   @Override                   publicboolean equals(Object obj) {                            if(this == obj)                                     returntrue;                            if(obj == null)                                     returnfalse;                            if(getClass() != obj.getClass())                                     returnfalse;                            TwoIntother = (TwoInt) obj;                            if(first == null) {                                     if(other.first != null)                                             returnfalse;                            }else if (!first.equals(other.first))                                     returnfalse;                            if(second == null) {                                     if(other.second != null)                                             returnfalse;                            }else if (!second.equals(other.second))                                     returnfalse;                            returntrue;                   }
                   @Override                   publicString toString() {                            returnfirst + "\t" + second;                   }
         }
}
;运行结果:


补充:在map 和reduce之间有个shuffle过程,可以简单的理解shuffle是将map的输出传到reduce中去。核心思想是:map中有个内存缓存区,存储着mapd的输出,存满了就写到文件中,所以的map都记录完了,就把产生的所有文件合并到一个文件中。reduce通过http得到map输出文件。在这里一篇博客有详细的讲解,http://langyu.iteye.com/blog/992916 。希望博主不要怪罪
页: [1]
查看完整版本: Hadoop学习之mapreduce及示例代码