下面是MR程序代码,在Eclipse上把程序打成jar包,丢到Linux上运行,报异常:
15/09/05 04:58:37 INFO ipc.Client: Retrying connect to server: hadoop/192.168.1.152:56017.Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1 SECONDS)
15/09/05 04:58:38 INFO ipc.Client: Retrying connect to server: hadoop/192.168.1.152:56017. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1 SECONDS)
15/09/05 04:58:39 INFO ipc.Client: Retrying connect to server: hadoop/192.168.1.152:56017. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1 SECONDS)
问题奇怪的是,如果把下面程序的中MyPartitionerPar类去掉,可以跑出结果,一旦加上这个类,就报上面的异常。
求解,万分感觉,谢谢!!!!
下面是MR代码:
[mw_shl_code=java,true]package hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyPartitioner {
// Map函数
public static class MyMap extends Mapper<LongWritable, Text, Text, Databean> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] arr = value.toString().split("\t");
Databean bean = new Databean("", Long.parseLong(arr[8]) ,Long.parseLong(arr[9]),0L);
if(arr != null && arr.length > 0){
context.write(new Text(arr[1]), bean);
}
}
}
public static class MyReduce extends Reducer<Text, Databean,Text, Databean> {
@Override
protected void reduce(Text k2, Iterable<Databean> v2s, Context context)
throws IOException, InterruptedException {
long up_sum =0l;
long down_sum =0l;
long sumCount = 0L;
for(Databean bean : v2s){
up_sum += bean.getUp_sum();
down_sum += bean.getDown_sum();
}
sumCount = up_sum + down_sum;
Databean data = new Databean("", up_sum, down_sum, sumCount);
context.write(k2, data);
}
}
public static class MyPartitionerPar extends Partitioner<Text, Databean> {
/**
* getPartition()方法的
* 输入参数:键/值对<key,value>与reducer数量numPartitions
* 输出参数:分配的Reducer编号,这里是result
* */
@Override
public int getPartition(Text key, Databean data, int numPartitions) {
// TODO Auto-generated method stub
int code = 0;
String sub_clss = key.toString().substring(0,3);
if(sub_clss != null && !"".equals(sub_clss)){
if("13".equals(sub_clss.substring(0, 2))){
return 1;
}else if ("15".equals(sub_clss.substring(0, 2))) {
return 2;
}else if ("18".equals(sub_clss.substring(0, 2))) {
return 3;
}
}
return code;
}
}
public static void main(String[] args) throws Exception {
@SuppressWarnings("deprecation")
Job conf = new Job(new Configuration());
conf.setJobName("MyPartitioner");
conf.setJarByClass(MyPartitioner.class);
//控制reducer数量,因为要分4个区,所以这里设定了3个reducer
conf.setNumReduceTasks(4);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Databean.class);
//设定分区类
conf.setPartitionerClass(MyPartitionerPar.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Databean.class);
//设定mapper和reducer类
conf.setMapperClass(MyMap.class);
conf.setReducerClass(MyReduce.class);
FileInputFormat.setInputPaths(conf, new Path("/dataOut/dat.log"));
FileOutputFormat.setOutputPath(conf, new Path("/dataOut/webout2"));
conf.waitForCompletion(true);
}
} [/mw_shl_code]
|