问题如下:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ProcedureJob extends Configured{
public static class MapClass extends Mapper{
private Path[] localFiles;
private URI[] uris;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
uris = DistributedCache.getCacheFiles(context.getConfiguration());
System.out.println(uris[0].toString()+"+++++++++++++++"+localFiles+"----------------");
FileSystem fs = FileSystem.get(URI.create("hdfs://namenode:9000"), context.getConfiguration());
FSDataInputStream in = null;
in = fs.open(new Path(uris[0].getPath()));
BufferedReader br=new BufferedReader(new InputStreamReader(in));
String str="";
while((str=br.readLine())!=null){
System.out.println(str+"---------------------------------------------");
}
br.close();
}
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new Text(key.toString()), new Text("1"));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);
DistributedCache.createSymlink(conf);
Job job = new Job(conf,"Test");
job.setJarByClass(ProcedureJob.class);
FileInputFormat.setInputPaths(job,new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setJobName("Test");
job.setMapperClass(MapClass.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
控制台输出信息:
hdfs://namenode:9000/data/public/test.txt+++++++++++++++null----------------
1 5.1 3.5 1.4 0.2 1---------------------------------------------
2 4.9 3 1.4 0.2 1---------------------------------------------
3 4.7 3.2 1.3 0.2 1---------------------------------------------
51 7 3.2 4.7 1.4 2---------------------------------------------
52 6.4 3.2 4.5 1.5 2---------------------------------------------
53 6.9 3.1 4.9 1.5 2---------------------------------------------
147 6.3 2.5 5 1.9 3---------------------------------------------
148 6.5 3 5.2 2 3---------------------------------------------
149 6.2 3.4 5.4 2.3 3---------------------------------------------
150 5.9 3 5.1 1.8 3---------------------------------------------
也就是说:我从HDFS添加了一个文件到DistributedCahe中,但是利用DistributedCache.getLocalCacheFiles(context.getConfiguration())方法得到的对象为空,而uris = DistributedCache.getCacheFiles(context.getConfiguration());能得到正确的URI,但是不知所以然,请各位大神指点一二,不胜感激。
|