分享

求助!!Mapreduce 使用distributedCache 的问题

zqy 发表于 2016-5-20 11:34:17 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 8926
public class WC {

        /**
         * @param args
         */
        public static class Map extends Mapper<Object,Text,Text,IntWritable>{
                private IntWritable one=new IntWritable(1);
                private HashSet<String> keyword;
            private Path[] localpaths;
                public void setup (Context context) throws IOException,NullPointerException{
                        keyword=new HashSet<String>();
                        Configuration conf=context.getConfiguration();
                        localpaths=DistributedCache.getLocalCacheFiles(conf);
                        for(int i=0;i<localpaths.length;i++){
                                String akeyword;
                                BufferedReader br=new BufferedReader(new FileReader(localpaths[i].toString()));
                                while((akeyword=br.readLine()) !=null){
                                        keyword.add(akeyword);
                                }
                                br.close();
                        }
                }
                public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
                        String line=value.toString();
                        StringTokenizer token=new StringTokenizer(line);
                        while(token.hasMoreTokens()){
                                String word=token.nextToken();
                                if(keyword.contains(word)){
                                        return;
                                }
                                context.write(new Text(word), one);
                        }
                }
        }
        public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
                        int sum=0;
                        Iterator<IntWritable> it=values.iterator();
                        while(it.hasNext()){
                                sum+=it.next().get();
                        }
                        context.write(key, new IntWritable(sum));
                }
        }
        public static void main(String[] args) throws Exception {
                // TODO Auto-generated method stub
                Configuration conf=new Configuration();
                String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
                if(otherArgs.length !=3){
                        System.err.print("Usage: <in> <out> <URI>");
                        System.exit(3);
                }
                DistributedCache.addCacheArchive(new Path(otherArgs[2]).toUri(), conf);
                Job job=new Job(conf,"WC");
                job.setJarByClass(WC.class);
                job.setMapperClass(Map.class);
                job.setCombinerClass(Reduce.class);
                job.setReducerClass(Reduce.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

}
报错部分:

2016-05-20 10:43:06,102 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2016-05-20 10:43:06,134 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local692170768_0001 running in uber mode : false
2016-05-20 10:43:06,137 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 0% reduce 0%
2016-05-20 10:43:06,169 WARN  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local692170768_0001
java.lang.Exception: java.lang.NullPointerException
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
        at org.apache.test.WC$Map.setup(WC.java:38)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2016-05-20 10:43:07,144 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_local692170768_0001 failed with state FAILED due to: NA
2016-05-20 10:43:07,156 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 0

已有(1)人评论

跳转到指定楼层
NEOGX 发表于 2016-5-20 12:19:24
    if(otherArgs.length !=3){
                        System.err.print("Usage: <in> <out> <URI>");
                        System.exit(3);
                }

这个逻辑不严密啊,应该是<=3
楼主可以通过打印的方式调试下

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条