分享

跑MR出现ipc.Client: Retrying connect to server异常

啊耀 发表于 2015-9-5 16:16:43 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 14032
下面是MR程序代码,在Eclipse上把程序打成jar包,丢到Linux上运行,报异常:
15/09/05 04:58:37 INFO ipc.Client: Retrying connect to server: hadoop/192.168.1.152:56017.Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1 SECONDS)
15/09/05 04:58:38 INFO ipc.Client: Retrying connect to server: hadoop/192.168.1.152:56017. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1 SECONDS)
15/09/05 04:58:39 INFO ipc.Client: Retrying connect to server: hadoop/192.168.1.152:56017. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1 SECONDS)


问题奇怪的是,如果把下面程序的中MyPartitionerPar类去掉,可以跑出结果,一旦加上这个类,就报上面的异常。
求解,万分感觉,谢谢!!!!

下面是MR代码:
[mw_shl_code=java,true]package hdfs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  
public class MyPartitioner {  
    // Map函数  
    public static class MyMap extends Mapper<LongWritable, Text, Text, Databean> {  
        @Override
                protected void map(LongWritable key, Text value,Context context)
                                throws IOException, InterruptedException {
                String[] arr = value.toString().split("\t");
                        Databean bean = new Databean("", Long.parseLong(arr[8]) ,Long.parseLong(arr[9]),0L);
                        if(arr != null && arr.length > 0){
                                context.write(new Text(arr[1]), bean);
                        }
                }
    }  
      
    public static class MyReduce extends Reducer<Text, Databean,Text, Databean> {
           
        @Override
                protected void reduce(Text k2, Iterable<Databean> v2s, Context context)
                                throws IOException, InterruptedException {
                long up_sum =0l;
                        long down_sum =0l;
                        long sumCount = 0L;
                        for(Databean bean : v2s){
                                up_sum += bean.getUp_sum();
                                down_sum += bean.getDown_sum();
                        }
                        sumCount = up_sum + down_sum;
                       
                        Databean data = new Databean("", up_sum, down_sum, sumCount);
                        context.write(k2, data);
        }
    }  
  
    public static class MyPartitionerPar extends Partitioner<Text, Databean> {  
        /**
         * getPartition()方法的
         * 输入参数:键/值对<key,value>与reducer数量numPartitions
         * 输出参数:分配的Reducer编号,这里是result
         * */  
        @Override  
        public int getPartition(Text key, Databean data, int numPartitions) {  
            // TODO Auto-generated method stub  
                int code = 0;
                        String sub_clss = key.toString().substring(0,3);
                        if(sub_clss != null && !"".equals(sub_clss)){
                                if("13".equals(sub_clss.substring(0, 2))){
                                        return 1;
                                }else if ("15".equals(sub_clss.substring(0, 2))) {
                                        return 2;
                                }else if ("18".equals(sub_clss.substring(0, 2))) {
                                        return 3;
                                }
                        }
                return code;
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        @SuppressWarnings("deprecation")
                Job conf = new Job(new Configuration());
        conf.setJobName("MyPartitioner");  
        conf.setJarByClass(MyPartitioner.class);
        
        //控制reducer数量,因为要分4个区,所以这里设定了3个reducer  
        conf.setNumReduceTasks(4);  
  
        conf.setMapOutputKeyClass(Text.class);  
        conf.setMapOutputValueClass(Databean.class);  
  
        //设定分区类  
        conf.setPartitionerClass(MyPartitionerPar.class);  
  
        conf.setOutputKeyClass(Text.class);  
        conf.setOutputValueClass(Databean.class);  
  
        //设定mapper和reducer类  
        conf.setMapperClass(MyMap.class);  
        conf.setReducerClass(MyReduce.class);  
  
        FileInputFormat.setInputPaths(conf, new Path("/dataOut/dat.log"));  
        FileOutputFormat.setOutputPath(conf, new Path("/dataOut/webout2"));  
        conf.waitForCompletion(true);
    }  
}  [/mw_shl_code]

已有(2)人评论

跳转到指定楼层
Alkaloid0515 发表于 2015-9-5 17:12:57
Databean data是否引用了相关包
回复

使用道具 举报

啊耀 发表于 2015-9-6 10:01:43
Alkaloid0515 发表于 2015-9-5 17:12
Databean data是否引用了相关包

应该不是jar包的问题,因为不加分区,这个MR还是可以跑出结果的。DataBean:
[mw_shl_code=java,true]package hdfs;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class Databean implements Writable{
        private String telNo;
        private long up_sum;
        private long down_sum;
        private long sumCount;
       
       
        public Databean() {}
       
       
       
        public Databean(String telNo, long up_sum, long down_sum, long sumCount) {
                this.telNo = telNo;
                this.up_sum = up_sum;
                this.down_sum = down_sum;
                this.sumCount = sumCount;
        }



        @Override
        public void write(DataOutput out) throws IOException {
                out.writeUTF(telNo);
                out.writeLong(up_sum);
                out.writeLong(down_sum);
                out.writeLong(sumCount);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
                this.telNo = in.readUTF();
                this.up_sum = in.readLong();
                this.down_sum = in.readLong();
                this.sumCount = in.readLong();
        }

        @Override
        public String toString() {
                return this.up_sum + "\t" + this.down_sum + "\t" + this.sumCount;
        }
       
        public String getTelNo() {
                return telNo;
        }

        public void setTelNo(String telNo) {
                this.telNo = telNo;
        }

        public long getUp_sum() {
                return up_sum;
        }

        public void setUp_sum(long up_sum) {
                this.up_sum = up_sum;
        }

        public long getDown_sum() {
                return down_sum;
        }

        public void setDown_sum(long down_sum) {
                this.down_sum = down_sum;
        }

        public long getSumCount() {
                return sumCount;
        }

        public void setSumCount(long sumCount) {
                this.sumCount = sumCount;
        }
       
       

}
[/mw_shl_code]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条