分享

Hadoop(最新版 ChainMapper 链式MapReduce 代码实现及原理分析)


问题导读


1.本文实现了什么功能?
2.遇到了什么问题?
3.ChainMapper原理本文作者是如何理解的?







[mw_shl_code=java,true]package com.zzg.test1;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapClass1 extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
                 String[] citation=ivalue.toString().split(" ");
         if(!citation[0].equals("100"))
         {
                 context.write(new Text(citation[0]), new Text(ivalue));
         }
        }

}
[/mw_shl_code]

[mw_shl_code=java,true]package com.zzg.test1;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapClass2 extends Mapper<Text, Text, Text, Text> {

        public void map(Text ikey, Text ivalue, Context context) throws IOException, InterruptedException {
                 String[] citation=ivalue.toString().split(" ");
         if(!ikey.toString().equals("101"))
         {
                 context.write(ikey, ivalue);
         }
        }

}
[/mw_shl_code]

[mw_shl_code=java,true]package com.zzg.test1;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Reduce extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                // process values
                for (Text val : values) {
                         context.write(null, val);
                }
        }

}
[/mw_shl_code]

[mw_shl_code=java,true]package com.zzg.test1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zzg.test1.MapClass1;
import com.zzg.test1.MapClass2;

public class Drive {

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "JobName");
                job.setJarByClass(com.zzg.test1.Drive.class);
                Configuration  map1Conf = new Configuration(false);  
           ChainMapper.addMapper(job, MapClass1.class, LongWritable.class, Text.class, Text.class, Text.class, map1Conf);
           Configuration  map2Conf = new Configuration(false);
                ChainMapper.addMapper(job, MapClass2.class, Text.class, Text.class, Text.class, Text.class, map2Conf);
                Configuration  map3Conf = new Configuration(false);
       
                job.setReducerClass(Reduce.class);
                FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/.txt"));
                FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output/test5"));

                if (!job.waitForCompletion(true))
                        return;
        }

}
[/mw_shl_code]


[mw_shl_code=bash,true]ChainMapper.txt
100 jack 90 23  78
101 zzg 85  21  32
102 qw 60  12  36
[/mw_shl_code]

[mw_shl_code=bash,true] WARN [main] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO [main] - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO [main] - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN [main] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
WARN [main] - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
INFO [main] - Total input paths to process : 1
INFO [main] - number of splits:1
INFO [main] - Submitting tokens for job: job_local1466046461_0001
INFO [main] - The url to track the job: http://localhost:8080/
INFO [main] - Running job: job_local1466046461_0001
INFO [Thread-12] - OutputCommitter set in config null
INFO [Thread-12] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
INFO [Thread-12] - Waiting for map tasks
INFO [LocalJobRunner Map Task Executor #0] - Starting task: attempt_local1466046461_0001_m_000000_0
INFO [LocalJobRunner Map Task Executor #0] -  Using ResourceCalculatorProcessTree : [ ]
INFO [LocalJobRunner Map Task Executor #0] - Processing split: hdfs://localhost:9000/input/ChainMapper.txt:0+35
INFO [LocalJobRunner Map Task Executor #0] - (EQUATOR) 0 kvi 26214396(104857584)
INFO [LocalJobRunner Map Task Executor #0] - mapreduce.task.io.sort.mb: 100
INFO [LocalJobRunner Map Task Executor #0] - soft limit at 83886080
INFO [LocalJobRunner Map Task Executor #0] - bufstart = 0; bufvoid = 104857600
INFO [LocalJobRunner Map Task Executor #0] - kvstart = 26214396; length = 6553600
INFO [LocalJobRunner Map Task Executor #0] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
INFO [main] - Job job_local1466046461_0001 running in uber mode : false
INFO [main] -  map 0% reduce 0%
INFO [LocalJobRunner Map Task Executor #0] -
INFO [LocalJobRunner Map Task Executor #0] - Starting flush of map output
INFO [LocalJobRunner Map Task Executor #0] - Spilling map output
INFO [LocalJobRunner Map Task Executor #0] - bufstart = 0; bufend = 16; bufvoid = 104857600
INFO [LocalJobRunner Map Task Executor #0] - kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600
INFO [LocalJobRunner Map Task Executor #0] - Finished spill 0
INFO [LocalJobRunner Map Task Executor #0] - Task:attempt_local1466046461_0001_m_000000_0 is done. And is in the process of committing
INFO [LocalJobRunner Map Task Executor #0] - map
INFO [LocalJobRunner Map Task Executor #0] - Task 'attempt_local1466046461_0001_m_000000_0' done.
INFO [LocalJobRunner Map Task Executor #0] - Finishing task: attempt_local1466046461_0001_m_000000_0
INFO [Thread-12] - map task executor complete.
INFO [Thread-12] - Waiting for reduce tasks
INFO [pool-6-thread-1] - Starting task: attempt_local1466046461_0001_r_000000_0
INFO [pool-6-thread-1] -  Using ResourceCalculatorProcessTree : [ ]
INFO [pool-6-thread-1] - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2be55f70
INFO [pool-6-thread-1] - MergerManager: memoryLimit=503893184, maxSingleShuffleLimit=125973296, mergeThreshold=332569504, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO [EventFetcher for fetching Map Completion Events] - attempt_local1466046461_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
INFO [localfetcher#1] - localfetcher#1 about to shuffle output of map attempt_local1466046461_0001_m_000000_0 decomp: 20 len: 24 to MEMORY
INFO [localfetcher#1] - Read 20 bytes from map-output for attempt_local1466046461_0001_m_000000_0
INFO [localfetcher#1] - closeInMemoryFile -> map-output of size: 20, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->20
INFO [EventFetcher for fetching Map Completion Events] - EventFetcher is interrupted.. Returning
INFO [pool-6-thread-1] - 1 / 1 copied.
INFO [pool-6-thread-1] - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
INFO [main] -  map 100% reduce 0%
INFO [pool-6-thread-1] - Merging 1 sorted segments
INFO [pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 14 bytes
INFO [pool-6-thread-1] - Merged 1 segments, 20 bytes to disk to satisfy reduce memory limit
INFO [pool-6-thread-1] - Merging 1 files, 24 bytes from disk
INFO [pool-6-thread-1] - Merging 0 segments, 0 bytes from memory into reduce
INFO [pool-6-thread-1] - Merging 1 sorted segments
INFO [pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 14 bytes
INFO [pool-6-thread-1] - 1 / 1 copied.
INFO [pool-6-thread-1] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
INFO [pool-6-thread-1] - Task:attempt_local1466046461_0001_r_000000_0 is done. And is in the process of committing
INFO [pool-6-thread-1] - 1 / 1 copied.
INFO [pool-6-thread-1] - Task attempt_local1466046461_0001_r_000000_0 is allowed to commit now
INFO [pool-6-thread-1] - Saved output of task 'attempt_local1466046461_0001_r_000000_0' to hdfs://localhost:9000/output/test5/_temporary/0/task_local1466046461_0001_r_000000
INFO [pool-6-thread-1] - reduce > reduce
INFO [pool-6-thread-1] - Task 'attempt_local1466046461_0001_r_000000_0' done.
INFO [pool-6-thread-1] - Finishing task: attempt_local1466046461_0001_r_000000_0
INFO [Thread-12] - reduce task executor complete.
INFO [main] -  map 100% reduce 100%
INFO [main] - Job job_local1466046461_0001 completed successfully
INFO [main] - Counters: 38
        File System Counters
                FILE: Number of bytes read=388
                FILE: Number of bytes written=498456
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=70
                HDFS: Number of bytes written=12
                HDFS: Number of read operations=13
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=4
        Map-Reduce Framework
                Map input records=3
                Map output records=1
                Map output bytes=16
                Map output materialized bytes=24
                Input split bytes=108
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=24
                Reduce input records=1
                Reduce output records=1
                Spilled Records=2
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=606
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=344981504
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=35
        File Output Format Counters
                Bytes Written=12
[/mw_shl_code]



代码很简单,就是取出来编号为100和101的学生。主要是体现ChainMapper链接MapReduce作业。值得注意网上的写法都太旧了,所以写出来一个新的供大家参考
一开始我就是把import org.apache.hadoop.mapreduce.lib.chain.ChainMapper这个正确的包导错了,这个包如果你导对了,根据提示你就能写出的最新的API来了,
和旧版不同的就是里面的参数变化比较大,没有了(boolean byValue),而且(JobConf mapperConf)被替换成了Configurantion
另外给大家讲讲原理:

ChainMapper/ChainReducer主要为了解决线性链式Mapper而提出的。也就是说,在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,也就是这里的MapReduce1的输出作为了MapReduce2的输入,所以我们要特别注意数据格式之间的对应关系,然后还要提醒一点的就是对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但Reducer只能有一个。原理图给个参考
wKioJlKmkqzBJBGGAACFElB45S4621.jpg
其实仔细想想就是在MapReduce的基础上,可以链接多个Mapper了。
Mapper,Reduce的顺序是可以根据你程序先后执行顺序任意调的。今天就到这了,欢迎大家继续留意我的博客




作者:
AC_great

已有(4)人评论

跳转到指定楼层
peterzor 发表于 2015-9-17 09:15:27
不错学习了

回复

使用道具 举报

jlon 发表于 2015-9-19 09:18:05
楼主威武,能在写一个链式Reduce的就好了!
回复

使用道具 举报

jlon 发表于 2015-9-19 09:18:37
楼主威武,能在写一个链式Reduce的就好了!
回复

使用道具 举报

szcountryboy 发表于 2015-9-22 11:46:57
map3conf起什么作用呢?
看代码应该是取102行的数据

map1   if(!citation[0].equals("100"))         ===> 处理101,102数据
map2   if(!ikey.toString().equals("101"))   ===> 在101,102里面只处理key不是101,那就是剩下的102了

是这样吗?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条