分享

使用DistributedCache出错

nickpro 2014-9-26 23:18:23 发表于 异常错误 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 11 37345
本帖最后由 nickpro 于 2014-9-26 23:20 编辑

现象:在Hadoop 2.x上,用新的DistributedCache的API,在mapper中会获取不到这个cache文件。

将HDFS文件添加到distributed cache中:
        job.addCacheFile(new Path(inputFileOnHDFS).toUri());
在mapper的setup()方法中:
        Configuration conf = context.getConfiguration();
        URI[] localCacheFiles = context.getCacheFiles();
        readCacheFile(localCacheFiles[0]);
        其中,readCacheFile()是我们自己的读取cache文件的方法,可能是这样做的(仅举个例子):
        private static void readCacheFile(URI cacheFileURI) throws IOException {
  BufferedReader reader = new BufferedReader(new FileReader(cacheFileURI.getPath()));
  String line;
  while ((line = reader.readLine()) != null) {
    //TODO: your code here
  }
  reader.close();
}
问题发现 context.getCacheFiles() 总是返回null,也就是你无法读到cache文件。

已有(10)人评论

跳转到指定楼层
howtodown 发表于 2014-9-27 00:42:36
最好贴出代码,相信你的思路是没有错的。

这里提几个不一样的地方,你可以尝试修改:
    job.addCacheFile(new Path(inputFileOnHDFS).toUri());

job.addCacheFile(new URI(inputFileOnHDFS));
例如:
job.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"));


把这个
URI[] localCacheFiles = context.getCacheFiles();
改成下面其中一种形式:
private Path[]   localArchives = context.getLocalCacheArchives();
private Path[]  localFiles = context.getLocalCacheFiles();


回复

使用道具 举报

nickpro 发表于 2014-9-27 08:04:34
howtodown 发表于 2014-9-27 00:42
最好贴出代码,相信你的思路是没有错的。

这里提几个不一样的地方,你可以尝试修改:

package hadoop.mapjoin;

import hadoop.other.Stock;
import hadoop.other.StockPrices;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapSideJoin extends Configured implements Tool {
        enum TESTW{
                MISSING,
                HELLO;
        }
        public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Stock, StockPrices> {
                private String stockSymbol;
                private HashMap<Stock,Double> stocks = new HashMap<Stock,Double>();
                @Override
                protected void setup(Context context) throws IOException, InterruptedException {
                        stockSymbol = context.getConfiguration().get("stockSymbol");
                        Configuration conf = context.getConfiguration();
                        //Path[] files = context.getLocalCacheFiles();
                        Path[] files = DistributedCache.getLocalCacheFiles(conf);
                        FileSystem fs = FileSystem.getLocal(conf);
                        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files[0])));
                        String[] splits = br.readLine().split(",");
                        if(splits[1].equals(stockSymbol)){
                                stocks.put(new Stock(splits[1],splits[2]), Double.valueOf(splits[3]));
                        }
                }

                @Override
                protected void map(LongWritable key, Text value, Context context)
                                throws IOException, InterruptedException {
                        Stock tmp = null;
                        String currentLine = value.toString();
                       
                        String[] splits = currentLine.split(",");
                        if(splits[1].equals(stockSymbol)){
                                context.getCounter(TESTW.HELLO).increment(1);
                                context.getCounter(TESTW.MISSING).increment(stocks.size());
                                tmp = new Stock(splits[1],splits[2]);
                        }
                       
                        if(stocks.containsKey(tmp)){
                                context.write(tmp, new StockPrices(stocks.get(tmp), Double.valueOf(splits[6])));
                        }
                }
        }

        public int run(String[] args) throws Exception {
                Job job = Job.getInstance(getConf(), "MapSideJoinJob");
                job.setJarByClass(getClass());
                Configuration conf = job.getConfiguration();
                conf.set("stockSymbol", args[0]);
                conf.set(TextOutputFormat.SEPERATOR, ",");
               
                Path out = new Path("joinoutput");
                out.getFileSystem(conf).delete(out,true);
                FileInputFormat.setInputPaths(job, new Path("stocks"));
                FileOutputFormat.setOutputPath(job, out);
                DistributedCache.addCacheFile(new Path("dividends/NYSE_dividends_A.csv").toUri() , conf);
                //job.addCacheFile(new Path("dividends/NYSE_dividends_A.csv").toUri());
                job.setMapperClass(MapSideJoinMapper.class);
                job.setInputFormatClass(TextInputFormat.class);
               
                job.setOutputFormatClass(TextOutputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setMapOutputKeyClass(Stock.class);
                job.setMapOutputValueClass(StockPrices.class);
               
                job.setNumReduceTasks(0);
               
                return job.waitForCompletion(true)?0:1;
        }

        public static void main(String[] args) {
                int result = 0;
                try {
                        result = ToolRunner.run(new Configuration(),  new MapSideJoin(), args);
                } catch (Exception e) {
                        e.printStackTrace();
                }
                System.exit(result);
        }
}


回复

使用道具 举报

howtodown 发表于 2014-9-27 11:20:01
nickpro 发表于 2014-9-27 08:04
package hadoop.mapjoin;

import hadoop.other.Stock;
DistributedCache.addCacheFile(new Path("/dividends/NYSE_dividends_A.csv").toUri() , conf);
前面加上斜杠/
回复

使用道具 举报

nickpro 发表于 2014-9-27 11:40:28
本帖最后由 nickpro 于 2014-9-27 11:42 编辑
howtodown 发表于 2014-9-27 11:20
DistributedCache.addCacheFile(new Path("/dividends/NYSE_dividends_A.csv").toUri() , conf);
前面加 ...

加上“/”肯定出错啊,我这个文件又不是放在根目录里面
回复

使用道具 举报

sstutu 发表于 2014-9-27 11:49:15
虽然没有做过,但是感觉你的conf配置有问题,你可以先写一个简单的例子,然后在做这个复杂的例子。

回复

使用道具 举报

nickpro 发表于 2014-9-27 11:51:13
sstutu 发表于 2014-9-27 11:49
虽然没有做过,但是感觉你的conf配置有问题,你可以先写一个简单的例子,然后在做这个复杂的例子。

已经写过好几个例子了,没有出错,就是做到这个例子的时候出错了
回复

使用道具 举报

sstutu 发表于 2014-9-27 11:53:48
nickpro 发表于 2014-9-27 11:51
已经写过好几个例子了,没有出错,就是做到这个例子的时候出错了
你可以对比这个例子,跟其他的例子的区别在什么地方,如果找不出来,你可以在原先的例子上,不断修改,逐渐改成你的这个例子。就能找到问题所在了。
回复

使用道具 举报

nickpro 发表于 2014-9-27 11:55:20
sstutu 发表于 2014-9-27 11:53
你可以对比这个例子,跟其他的例子的区别在什么地方,如果找不出来,你可以在原先的例子上,不断修改,逐 ...

http://www.codelast.com/?p=8131   这是我在网上找的,你看一下,该怎么解决?

nickpro 发表于 2014-9-27 12:15:52
sstutu 发表于 2014-9-27 11:53
你可以对比这个例子,跟其他的例子的区别在什么地方,如果找不出来,你可以在原先的例子上,不断修改,逐 ...

http://www.codelast.com/?p=8131  这是我在网上找到的,他说使用旧api可以成功,不过我试了一下,也没法成功啊
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条