public class px {
/**
* @param args
*/
public static class SumMap extends Mapper<Object,Text,Text,IntWritable>{
private Text word;
private IntWritable one=new IntWritable(1);
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String aword;
StringTokenizer token=new StringTokenizer(value.toString());
while(token.hasMoreTokens()){
aword=token.nextToken();
word.set(aword);
}
context.write(word, one);
}
}
public static class SumReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
int sum=0;
for(IntWritable val : values){
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static class sortMap extends Mapper<Object,Text,IntWritable,Text>{
private IntWritable k=new IntWritable();
private Text t=new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String[] arr=value.toString().split(" ");
if(arr.length==2){
String kv=arr[0];
k.set(Integer.parseInt(kv));
String tv=arr[1];
t.set(tv);
}
context.write(k, t);
}
}
public static class sortReduce extends Reducer<IntWritable,Text,Text,IntWritable>{
public void reduce(IntWritable key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
String line=values.toString();
context.write(new Text(line), key);
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration jobconf=new Configuration();//第一个Maperduc
Job job1=new Job(jobconf,"job1");
job1.setJarByClass(px.class);
job1.setMapperClass(SumMap.class);
job1.setCombinerClass(SumReduce.class);
job1.setReducerClass(SumReduce.class);
//job1.setMapOutputKeyClass(IntWritable.class);
//job1.setMapOutputValueClass(Text.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path("hdfs://localhost:9000/test/test1"));
ControlledJob cjob1=new ControlledJob(jobconf);
cjob1.setJob(job1);
FileOutputFormat.setOutputPath(job1, new Path("hdfs://localhost:9000/output/px1"));
job1.waitForCompletion(true) ;
//Configuration job2conf=new Configuration();//第二个mapreduce
Job job2=new Job(jobconf,"job2");
job2.setJarByClass(px.class);
job2.setMapperClass(sortMap.class);
job2.setReducerClass(sortReduce.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job2, new Path("hdfs://localhost:9000/output/px1"));
ControlledJob cjob2=new ControlledJob(jobconf);
cjob2.setJob(job2);
FileOutputFormat.setOutputPath(job2, new Path("hdfs://locahost:9000/output/px2"));
job2.waitForCompletion(true);
cjob2.addDependingJob(cjob1);
JobControl jobCtrl=new JobControl("myctrl");
//ctrljob1.addDependingJob(ctrljob2);// job2在job1完成后,才可以启动
//添加到总的JobControl里,进行控制
jobCtrl.addJob(cjob1);
jobCtrl.addJob(cjob2);
}
}
报错:
2016-05-22 15:48:57,425 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(976)) - soft limit at 83886080
2016-05-22 15:48:57,426 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(977)) - bufstart = 0; bufvoid = 104857600
2016-05-22 15:48:57,426 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(978)) - kvstart = 26214396; length = 6553600
2016-05-22 15:48:57,593 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1437)) - Starting flush of map output
2016-05-22 15:48:57,643 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2016-05-22 15:48:57,807 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local427298243_0001 running in uber mode : false
2016-05-22 15:48:57,810 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 0% reduce 0%
2016-05-22 15:48:57,897 WARN [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local427298243_0001
java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
at px$SumMap.map(px.java:30)
at px$SumMap.map(px.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2016-05-22 15:48:58,815 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_local427298243_0001 failed with state FAILED due to: NA
2016-05-22 15:48:58,828 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 0
2016-05-22 15:48:58,945 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(71)) - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2016-05-22 15:49:03,182 INFO [communication thread] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map > sort
2016-05-22 15:49:18,969 INFO [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 0 time(s); maxRetries=45
2016-05-22 15:49:39,008 INFO [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 1 time(s); maxRetries=45
2016-05-22 15:49:59,028 INFO [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 2 time(s); maxRetries=45
2016-05-22 15:50:19,054 INFO [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 3 time(s); maxRetries=45
|
|