问题导读
1.本文实现了什么功能?
2.遇到了什么问题?
3.ChainMapper原理本文作者是如何理解的?
[mw_shl_code=java,true]package com.zzg.test1;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapClass1 extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String[] citation=ivalue.toString().split(" ");
if(!citation[0].equals("100"))
{
context.write(new Text(citation[0]), new Text(ivalue));
}
}
}
[/mw_shl_code]
[mw_shl_code=java,true]package com.zzg.test1;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MapClass2 extends Mapper<Text, Text, Text, Text> {
public void map(Text ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String[] citation=ivalue.toString().split(" ");
if(!ikey.toString().equals("101"))
{
context.write(ikey, ivalue);
}
}
}
[/mw_shl_code]
[mw_shl_code=java,true]package com.zzg.test1;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// process values
for (Text val : values) {
context.write(null, val);
}
}
}
[/mw_shl_code]
[mw_shl_code=java,true]package com.zzg.test1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.zzg.test1.MapClass1;
import com.zzg.test1.MapClass2;
public class Drive {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(com.zzg.test1.Drive.class);
Configuration map1Conf = new Configuration(false);
ChainMapper.addMapper(job, MapClass1.class, LongWritable.class, Text.class, Text.class, Text.class, map1Conf);
Configuration map2Conf = new Configuration(false);
ChainMapper.addMapper(job, MapClass2.class, Text.class, Text.class, Text.class, Text.class, map2Conf);
Configuration map3Conf = new Configuration(false);
job.setReducerClass(Reduce.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output/test5"));
if (!job.waitForCompletion(true))
return;
}
}
[/mw_shl_code]
[mw_shl_code=bash,true]ChainMapper.txt
100 jack 90 23 78
101 zzg 85 21 32
102 qw 60 12 36
[/mw_shl_code]
[mw_shl_code=bash,true] WARN [main] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
INFO [main] - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO [main] - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN [main] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
WARN [main] - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO [main] - Total input paths to process : 1
INFO [main] - number of splits:1
INFO [main] - Submitting tokens for job: job_local1466046461_0001
INFO [main] - The url to track the job: http://localhost:8080/
INFO [main] - Running job: job_local1466046461_0001
INFO [Thread-12] - OutputCommitter set in config null
INFO [Thread-12] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
INFO [Thread-12] - Waiting for map tasks
INFO [LocalJobRunner Map Task Executor #0] - Starting task: attempt_local1466046461_0001_m_000000_0
INFO [LocalJobRunner Map Task Executor #0] - Using ResourceCalculatorProcessTree : [ ]
INFO [LocalJobRunner Map Task Executor #0] - Processing split: hdfs://localhost:9000/input/ChainMapper.txt:0+35
INFO [LocalJobRunner Map Task Executor #0] - (EQUATOR) 0 kvi 26214396(104857584)
INFO [LocalJobRunner Map Task Executor #0] - mapreduce.task.io.sort.mb: 100
INFO [LocalJobRunner Map Task Executor #0] - soft limit at 83886080
INFO [LocalJobRunner Map Task Executor #0] - bufstart = 0; bufvoid = 104857600
INFO [LocalJobRunner Map Task Executor #0] - kvstart = 26214396; length = 6553600
INFO [LocalJobRunner Map Task Executor #0] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
INFO [main] - Job job_local1466046461_0001 running in uber mode : false
INFO [main] - map 0% reduce 0%
INFO [LocalJobRunner Map Task Executor #0] -
INFO [LocalJobRunner Map Task Executor #0] - Starting flush of map output
INFO [LocalJobRunner Map Task Executor #0] - Spilling map output
INFO [LocalJobRunner Map Task Executor #0] - bufstart = 0; bufend = 16; bufvoid = 104857600
INFO [LocalJobRunner Map Task Executor #0] - kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600
INFO [LocalJobRunner Map Task Executor #0] - Finished spill 0
INFO [LocalJobRunner Map Task Executor #0] - Task:attempt_local1466046461_0001_m_000000_0 is done. And is in the process of committing
INFO [LocalJobRunner Map Task Executor #0] - map
INFO [LocalJobRunner Map Task Executor #0] - Task 'attempt_local1466046461_0001_m_000000_0' done.
INFO [LocalJobRunner Map Task Executor #0] - Finishing task: attempt_local1466046461_0001_m_000000_0
INFO [Thread-12] - map task executor complete.
INFO [Thread-12] - Waiting for reduce tasks
INFO [pool-6-thread-1] - Starting task: attempt_local1466046461_0001_r_000000_0
INFO [pool-6-thread-1] - Using ResourceCalculatorProcessTree : [ ]
INFO [pool-6-thread-1] - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@2be55f70
INFO [pool-6-thread-1] - MergerManager: memoryLimit=503893184, maxSingleShuffleLimit=125973296, mergeThreshold=332569504, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO [EventFetcher for fetching Map Completion Events] - attempt_local1466046461_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
INFO [localfetcher#1] - localfetcher#1 about to shuffle output of map attempt_local1466046461_0001_m_000000_0 decomp: 20 len: 24 to MEMORY
INFO [localfetcher#1] - Read 20 bytes from map-output for attempt_local1466046461_0001_m_000000_0
INFO [localfetcher#1] - closeInMemoryFile -> map-output of size: 20, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->20
INFO [EventFetcher for fetching Map Completion Events] - EventFetcher is interrupted.. Returning
INFO [pool-6-thread-1] - 1 / 1 copied.
INFO [pool-6-thread-1] - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
INFO [main] - map 100% reduce 0%
INFO [pool-6-thread-1] - Merging 1 sorted segments
INFO [pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 14 bytes
INFO [pool-6-thread-1] - Merged 1 segments, 20 bytes to disk to satisfy reduce memory limit
INFO [pool-6-thread-1] - Merging 1 files, 24 bytes from disk
INFO [pool-6-thread-1] - Merging 0 segments, 0 bytes from memory into reduce
INFO [pool-6-thread-1] - Merging 1 sorted segments
INFO [pool-6-thread-1] - Down to the last merge-pass, with 1 segments left of total size: 14 bytes
INFO [pool-6-thread-1] - 1 / 1 copied.
INFO [pool-6-thread-1] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
INFO [pool-6-thread-1] - Task:attempt_local1466046461_0001_r_000000_0 is done. And is in the process of committing
INFO [pool-6-thread-1] - 1 / 1 copied.
INFO [pool-6-thread-1] - Task attempt_local1466046461_0001_r_000000_0 is allowed to commit now
INFO [pool-6-thread-1] - Saved output of task 'attempt_local1466046461_0001_r_000000_0' to hdfs://localhost:9000/output/test5/_temporary/0/task_local1466046461_0001_r_000000
INFO [pool-6-thread-1] - reduce > reduce
INFO [pool-6-thread-1] - Task 'attempt_local1466046461_0001_r_000000_0' done.
INFO [pool-6-thread-1] - Finishing task: attempt_local1466046461_0001_r_000000_0
INFO [Thread-12] - reduce task executor complete.
INFO [main] - map 100% reduce 100%
INFO [main] - Job job_local1466046461_0001 completed successfully
INFO [main] - Counters: 38
File System Counters
FILE: Number of bytes read=388
FILE: Number of bytes written=498456
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=70
HDFS: Number of bytes written=12
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=3
Map output records=1
Map output bytes=16
Map output materialized bytes=24
Input split bytes=108
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=24
Reduce input records=1
Reduce output records=1
Spilled Records=2
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=606
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=344981504
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=35
File Output Format Counters
Bytes Written=12
[/mw_shl_code]
代码很简单,就是取出来编号为100和101的学生。主要是体现ChainMapper链接MapReduce作业。值得注意网上的写法都太旧了,所以写出来一个新的供大家参考
一开始我就是把import org.apache.hadoop.mapreduce.lib.chain.ChainMapper这个正确的包导错了,这个包如果你导对了,根据提示你就能写出的最新的API来了,
和旧版不同的就是里面的参数变化比较大,没有了(boolean byValue),而且(JobConf mapperConf)被替换成了Configurantion
另外给大家讲讲原理:
ChainMapper/ChainReducer主要为了解决线性链式Mapper而提出的。也就是说,在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,也就是这里的MapReduce1的输出作为了MapReduce2的输入,所以我们要特别注意数据格式之间的对应关系,然后还要提醒一点的就是对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但Reducer只能有一个。原理图给个参考
其实仔细想想就是在MapReduce的基础上,可以链接多个Mapper了。
Mapper,Reduce的顺序是可以根据你程序先后执行顺序任意调的。今天就到这了,欢迎大家继续留意我的博客
作者:AC_great
|
|