楼主看下这个程序应该就明了了,虽然版本比较旧,但是足以说明问题
[mw_shl_code=java,true]public class TestwithMultipleOutputs extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {
private MultipleOutputs<Text,IntWritable> mos;
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text,IntWritable>(context);
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] tokens = line.split("-");
mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //(第一处)
mos.write("MOSText", new Text(tokens[0]),tokens[2]); //(第二处)
mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/"); //(第三处)同时也可写到指定的文件或文件夹中
}
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf,"word count with MultipleOutputs");
job.setJarByClass(TestwithMultipleOutputs.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
System.exit(res);
}
}[/mw_shl_code]
测试的数据:
abc-1232-hdf
abc-123-rtd
ioj-234-grjth
ntg-653-sdgfvd
kju-876-btyun
bhm-530-bhyt
hfter-45642-bhgf
bgrfg-8956-fmgh
jnhdf-8734-adfbgf
ntg-68763-nfhsdf
ntg-98634-dehuy
hfter-84567-drhuk
结果截图:(结果输出到/test/testMOSout)
然后:
对于不同的路径,hadoop可以对应不同的map。
不同路径的文件,楼主可以对里面的内容做一个标记。
输出到不同的路径中。
MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);
|