@easthome001:你好
我在工作中:用mr向hdfs输出数据,启用3个reduce。我的value是字符串类型,是new text(value)传入自定义的recordwrite里 我直接 out.write(o.toString().getBytes());但是输入到hdfs为空 。我在 out.write(o.toString().getBytes())前打印日志 system.out(value.tostring())是有数据的呀,到现在没找到空的原因:代码
public class InputEdgeInReplaceIDFormat <K, V> extends FileOutputFormat<K, V>{
public static final Logger LOG = Logger.getLogger(InputEdgeInReplaceIDFormat.class);
public static class ReplaceRecordWriter<K, V> extends RecordWriter<K, V> {
private DataOutputStream fileWrite;
public ReplaceRecordWriter(Configuration conf, DataOutputStream out)
throws IOException {
this.fileWrite = out;
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println(value.toString()+"\n");
// 开始写数据
fileWrite.write((value.toString()+"\n").getBytes());
}
}
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 创建输出管道
Configuration conf = context.getConfiguration();
Path file = getDefaultWorkFile(context, "");
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new ReplaceRecordWriter<K, V>(conf,fileOut);
}
|