分享

hadoop 0.20.2 全分布式下 DistributedCahe的使用问题

RUIJILIANG 发表于 2013-10-16 13:38:42 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 5946
问题如下:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ProcedureJob extends Configured{
        public static class MapClass extends  Mapper{
                private Path[] localFiles;
                private URI[] uris;
                @Override
                protected void setup(Context context) throws IOException,
                                InterruptedException {
                       localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                       uris = DistributedCache.getCacheFiles(context.getConfiguration());
                       System.out.println(uris[0].toString()+"+++++++++++++++"+localFiles+"----------------");
                       FileSystem fs = FileSystem.get(URI.create("hdfs://namenode:9000"), context.getConfiguration());
                       FSDataInputStream in = null;
                       in = fs.open(new Path(uris[0].getPath()));
                       BufferedReader br=new BufferedReader(new InputStreamReader(in));
                       String str="";
                       while((str=br.readLine())!=null){
                               System.out.println(str+"---------------------------------------------");
                       }
                       br.close();
                }
                protected void map(Object key, Text value, Context context)
                                throws IOException, InterruptedException {
                        
                      context.write(new Text(key.toString()), new Text("1"));
                }
        }
        
        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
               
                DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);  
                DistributedCache.createSymlink(conf);
               
                Job job = new Job(conf,"Test");
                job.setJarByClass(ProcedureJob.class);
                FileInputFormat.setInputPaths(job,new Path(args[1]));
                FileOutputFormat.setOutputPath(job, new Path(args[2]));
                job.setJobName("Test");
                job.setMapperClass(MapClass.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setNumReduceTasks(0);
                job.waitForCompletion(true);
        }
}
控制台输出信息:
hdfs://namenode:9000/data/public/test.txt+++++++++++++++null----------------
1        5.1        3.5        1.4        0.2        1---------------------------------------------
2        4.9        3        1.4        0.2        1---------------------------------------------
3        4.7        3.2        1.3        0.2        1---------------------------------------------
51        7        3.2        4.7        1.4        2---------------------------------------------
52        6.4        3.2        4.5        1.5        2---------------------------------------------
53        6.9        3.1        4.9        1.5        2---------------------------------------------
147        6.3        2.5        5        1.9        3---------------------------------------------
148        6.5        3        5.2        2        3---------------------------------------------
149        6.2        3.4        5.4        2.3        3---------------------------------------------
150        5.9        3        5.1        1.8        3---------------------------------------------
也就是说:我从HDFS添加了一个文件到DistributedCahe中,但是利用DistributedCache.getLocalCacheFiles(context.getConfiguration())方法得到的对象为空,而uris = DistributedCache.getCacheFiles(context.getConfiguration());能得到正确的URI,但是不知所以然,请各位大神指点一二,不胜感激。
              
               
            

已有(1)人评论

跳转到指定楼层
liyjs 发表于 2013-10-24 21:06:38
我是个凑数的。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条