Reducer端数据接收不到,也就是迭代器中貌似就没有数据!求解
package com.pri;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author 吕梁彪
* 此次在Reducer进行join操作
* 输入路径为多个
*/
public class Mutil_File_ReducerMeger extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Mutil_File_ReducerMeger");
job.setJarByClass(Mutil_File_ReducerMeger.class);
job.setMapperClass(Mutil_File_ReducerMeger_Mapper.class);
job.setReducerClass(Mutil_File_ReducerMeger_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LLBWritable.class);
job.setOutputKeyClass(LLBWritable.class);
job.setOutputValueClass(NullWritable.class);
//job.setNumReduceTasks(0);
Path path = new Path("D:\\IOoperation\\MapReducer\\output\\out1");
FileSystem fs = FileSystem.get(conf);
if(fs.exists(path)) {
fs.delete(path,true);
}
FileInputFormat.setInputPaths(job, "D:\\IOoperation\\MapReducer\\input\\shop_information.txt,D:\\IOoperation\\MapReducer\\input\\shop.txt");
FileOutputFormat.setOutputPath(job, path);
return job.waitForCompletion(true)?0:1;
}
public static class LLBWritable implements Writable,Serializable{
private static final long serialVersionUID = 1L;
private String goodsName;
private int goodsId;
private int goodsPrice;
private int goodsSales;
private int goodsFlag;
public void set(String goodsName,int goodsId,int goodsPrice,int goodsSales,int goodsFlag) {
this.goodsName = goodsName;
this.goodsId = goodsId;
this.goodsPrice = goodsPrice;
this.goodsSales = goodsSales;
this.goodsFlag = goodsFlag;
}
public int getGoodsFlag() {
return goodsFlag;
}
public void setGoodsFlag(int goodsFlag) {
this.goodsFlag = goodsFlag;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public int getGoodsId() {
return goodsId;
}
public void setGoodsId(int goodsId) {
this.goodsId = goodsId;
}
public int getGoodsPrice() {
return goodsPrice;
}
public void setGoodsPrice(int goodsPrice) {
this.goodsPrice = goodsPrice;
}
public int getGoodsSales() {
return goodsSales;
}
public void setGoodsSales(int goodsSales) {
this.goodsSales = goodsSales;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(goodsName);
out.writeInt(goodsId);
out.writeInt(goodsPrice);
out.writeInt(goodsSales);
out.writeInt(goodsFlag);
}
@Override
public void readFields(DataInput in) throws IOException {
in.readUTF();
in.readInt();
in.readInt();
in.readInt();
in.readInt();
}
@Override
public String toString() {
return "Goods_Information [goodsName=" + goodsName + ", goodsId=" + goodsId + ", goodsPrice=" + goodsPrice
+ ", goodsSales=" + goodsSales + "]";
}
}
public static class Mutil_File_ReducerMeger_Mapper extends Mapper<LongWritable, Text, Text, LLBWritable>{
Text k = new Text();
LLBWritable llb = new LLBWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取输入文件的名称(便于分类)
FileSplit fs =(FileSplit)context.getInputSplit();
String name = fs.getPath().getName();
String[] values;
if(name.contains("information")) {
k.set("shop_information");
values = StringUtils.split(value.toString(),"\t");
for(int i = 0 ; i < values.length ; i++) {
llb.set(values,Integer.parseInt(values), 0, Integer.parseInt(values), 2);
}
}else {
k.set("shop");
values = StringUtils.split(value.toString(),"\t");
for(int i = 0 ; i < values.length ; i++) {
llb.set("", Integer.parseInt(values), Integer.parseInt(values), 0, 1);
}
}
context.write(k, llb);
}
}
public static class Mutil_File_ReducerMeger_Reducer extends Reducer<Text, LLBWritable, LLBWritable, NullWritable>{
ArrayList<LLBWritable> list = new ArrayList<>();
LLBWritable llb1 = new LLBWritable();
@Override
protected void reduce(Text arg0, Iterable<LLBWritable> values,Context context) throws IOException, InterruptedException {
for (LLBWritable value : values) {
if("shop".equals(arg0.toString())) {
try {
BeanUtils.copyProperties(llb1,value);
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}else {
try {
LLBWritable llb2 =new LLBWritable();
BeanUtils.copyProperties(llb2,value);
list.add(llb2);
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
for (LLBWritable llb : list) {
llb.setGoodsPrice(llb1.getGoodsPrice());
context.write(llb, NullWritable.get());
}
}
}
public static void main(String[] args) {
try {
System.out.println(ToolRunner.run(new Mutil_File_ReducerMeger(), args));
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
这是在哪运行的。
FileInputFormat.setInputPaths(job, "D:\\IOoperation\\MapReducer\\input\\shop_information.txt,D:\\IOoperation\\MapReducer\\input\\shop.txt");
FileOutputFormat.setOutputPath(job, path);
路径不建议使用window,而且如果换个地方,这个运行就会出错,而且不能读取文件。
最好使用hdfs路径。
只是作为实验的在window下测试一下reducer side join 方法,就是reducer中iterator没有值,但是key是有值存在的。如果将reducer的输出设置为0的话,map端也可以输出数据! sstutu 发表于 2017-12-1 16:07
这是在哪运行的。
FileInputFormat.setInputPaths(job, "D:\\IOoperation\\MapReducer\\input\\shop_infor ...
只是作为实验的在window下测试一下reducer side join 方法,就是reducer中iterator没有值,但是key是有值存在的。如果将reducer的输出设置为0的话,map端也可以输出数据!
问题已经解决了,原因是序列化也就是自定义类型中出了点问题! 学习学习,一名小白! 逸辰不逸晨 发表于 2018-2-28 17:18
问题已经解决了,原因是序列化也就是自定义类型中出了点问题!
序列化中定义的类出现了什么问题?
页:
[1]