public class WC {
/**
* @param args
*/
public static class Map extends Mapper<Object,Text,Text,IntWritable>{
private IntWritable one=new IntWritable(1);
private HashSet<String> keyword;
private Path[] localpaths;
public void setup (Context context) throws IOException,NullPointerException{
keyword=new HashSet<String>();
Configuration conf=context.getConfiguration();
localpaths=DistributedCache.getLocalCacheFiles(conf);
for(int i=0;i<localpaths.length;i++){
String akeyword;
BufferedReader br=new BufferedReader(new FileReader(localpaths[i].toString()));
while((akeyword=br.readLine()) !=null){
keyword.add(akeyword);
}
br.close();
}
}
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String line=value.toString();
StringTokenizer token=new StringTokenizer(line);
while(token.hasMoreTokens()){
String word=token.nextToken();
if(keyword.contains(word)){
return;
}
context.write(new Text(word), one);
}
}
}
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum=0;
Iterator<IntWritable> it=values.iterator();
while(it.hasNext()){
sum+=it.next().get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf=new Configuration();
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=3){
System.err.print("Usage: <in> <out> <URI>");
System.exit(3);
}
DistributedCache.addCacheArchive(new Path(otherArgs[2]).toUri(), conf);
Job job=new Job(conf,"WC");
job.setJarByClass(WC.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
报错部分:
2016-05-20 10:43:06,102 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2016-05-20 10:43:06,134 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local692170768_0001 running in uber mode : false
2016-05-20 10:43:06,137 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 0% reduce 0%
2016-05-20 10:43:06,169 WARN [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local692170768_0001
java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
at org.apache.test.WC$Map.setup(WC.java:38)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2016-05-20 10:43:07,144 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_local692170768_0001 failed with state FAILED due to: NA
2016-05-20 10:43:07,156 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 0
|
|