分享

MapReduce处理key-value问题,Mapreduce调试问题

Timmy 发表于 2015-11-12 10:08:30 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 12356
问题1:Mapreduce中,使用默认TextInputFormat时,数据是按照每行读取的,这里key是默认key,value是我自定义的Trans对象;我将所有数据处理都写在了一个map中,Map中的功能被分为4块分别被写入了calculateSup(trans),calculatePrefer(trans),calculateGradedPrefer(trans),calculateIntegral(trans)中,但是现在出现一个问题,当进入第一个calculateSup(trans)函数之后,hadoop就会一直向下读取key—value,从而还没进入后面调用的calculatePrefer(trans),calculateGradedPrefer(trans),calculateIntegral(trans)方法,就已经把所有数据读取完了,如果我要达到一条记录分别被calculateSup(trans),calculatePrefer(trans),calculateGradedPrefer(trans),calculateIntegral(trans)这些方法依次处理后在依次处理下一条方法的话,需要如何设置?

问题2:请问如何能简便快捷的对Mapreduce程序进程调试,我现在的处理方法是在程序中打印控制台输出语句System.out.println(),之后将程序打包为jar包上传到Namenode节点运行之后,输出日志进行调试,还有其他更方面的调试方法吗?

/**
* 匹配优惠政策的MAPPER 1.在Mapper启动时加载基础资料到内存 2.读取流水,匹配优惠政策,计算收支:
* .读取一条流水,转换交易流水,流水无效则写入错误文件,
*/
public class MatchPPMapper extends Mapper<Object, Text, Text, GBKText> {
        private static Logger logger = LoggerFactory.getLogger(MatchPPMapper.class);
        // debug开关
        public static boolean flag = false;
        public static boolean flag2 = false;
        public static boolean iflag = false;

        private ArrayList<SupCommision> supList = new ArrayList<SupCommision>();// 优惠规则列表
        private ArrayList<GoodsPrice> gpriceList = new ArrayList<GoodsPrice>();
        private ArrayList<ClientDiscount> clientList = new ArrayList<ClientDiscount>();
        private ArrayList<TieredData> tieredDataList = new ArrayList<TieredData>();

        // 新增列表:1、客户优惠支出列表,2、客户达量支出列表,3、客户积分匹配规则列表,4、客户会员列表 5、客户达量信息列表
        private ArrayList<Member> memberList = new ArrayList<Member>();
        private ArrayList<Integral> integralList = new ArrayList<Integral>();
        private ArrayList<CustomPreferential> cPreferList = new ArrayList<CustomPreferential>();
        private ArrayList<GradedPreferential> gPreferList = new ArrayList<GradedPreferential>();
        private ArrayList<GradedData> gradedDataList = new ArrayList<GradedData>();
        private Text outPutKey = new Text();
        private GBKText outPutValue = new GBKText();
        private PolicyMatch sm = new PolicyMatch();
        private Calculate cal = new Calculate();

        private String randomStr = "0"; // 在Mapper运行前获取一个随机数每个map输出不同的文件名,如果不设置,后面的map会把前面的输出覆盖。

        @Override
        protected void setup(Context context) throws IOException,
                        InterruptedException {
                // 获得输入文件的路径名
                randomStr = UUID.randomUUID().toString();
                BufferedReader in = null;
                logger.warn("Map setup....................");
                try {
                        // 从当前作业中获取要缓存的文件
                        Path[] paths = DistributedCache.getLocalCacheFiles(context
                                        .getConfiguration());
                        if (flag && flag2) {
                                for (Path p : paths) {
                                        System.out.println("##文件路径是: " + p.toString());
                                }
                        }

                        String value = null;

                        if (flag && flag2) {
                                System.out.println("正在导入原始数据!!");
                        }
                        for (Path path : paths) {
                                if (flag && flag2) {
                                        System.out.println("路径 path: " + path.toString());
                                }

                                if (path.toString().contains("sup_commission.txt")) {
                                        SupCommision sup = null;

                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));

                                        while (null != (value = in.readLine())) {
                                                sup = new SupCommision();
                                                if (flag && flag2) {
                                                        // System.out.println("读取行数据"+value);
                                                }
                                                if (sup.parse(value)) {
                                                        supList.add(sup);
                                                }
                                        }
                                        if (flag && flag2) {
                                                System.out.println("SupNum:" + supList.size());
                                        }
                                } else if (path.toString().contains("goods_price.txt")) {
                                        GoodsPrice gp = null;

                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                        while (null != (value = in.readLine())) {
                                                gp = new GoodsPrice();
                                                if(flag){
                                                        System.out.println("读取行数据   " + value);
                                                }
                                                if (gp.parse(value)) {
                                                        gpriceList.add(gp);
                                                }
                                        }
                                        if(flag){
                                                System.out.println("GoodsNum:——" + gpriceList.size());
                                        }
                                }

                                else if (path.toString().contains("tieredData.txt")) {
                                        System.out.println("开始导入供应商达量数据");
                                        TieredData td = null;

                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                        while (null != (value = in.readLine())) {
                                                td = new TieredData();
                                                if(flag){
                                                        System.out.println("读取行数据" + value);
                                                }
                                                if (td.parse(value)) {
                                                        tieredDataList.add(td);
                                                }
                                        }
                                        if(flag){
                                                System.out.println("tieredDataNum:" + tieredDataList.size());
                                        }
                                }
                                // 新录入,优惠支出
                                else if (path.toString().contains("custom_preferential.txt")) {
                                        CustomPreferential cPrefer = null;
                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                        while (null != (value = in.readLine())) {
                                                cPrefer = new CustomPreferential();
                                                // System.out.println("读取行数据"+value);
                                                if (cPrefer.parse(value)) {
                                                        cPreferList.add(cPrefer);
                                                        if(flag){
                                                                System.out.println("导入非达量优惠政策!!!!! 读入数据个数"
                                                                                + cPreferList.size());
                                                        }
                                                }
                                        }
                                        if(flag){
                                                System.out.println("custom_preferentialNum:"
                                                                + cPreferList.size());
                                        }
                                }
                                // 新录入,客户优惠梯度基本信息
                                else if (path.toString().contains("graded_preferential.txt")) {
                                        GradedPreferential gPrefer = null;
                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                       
//                                        if(flag){
//                                                System.out.println(" 导入客户达量优惠数据 ,value="+value);
//                                        }
                                        while (null != (value = in.readLine())) {
                                                gPrefer = new GradedPreferential();
                                                if (gPrefer.parse(value)) {
                                                        gPreferList.add(gPrefer);
                                                }
                                        }
                                        if(flag){
                                                System.out.println("graded_preferentialNum:"
                                                                + gPreferList.size());
                                        }
                                }
                                // 新录入,会员基本信息
                                else if (path.toString().contains("member_info.txt")) {
                                        Member member = null;
                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                        if(flag||iflag){
                                                System.out.println(" 导入会员信息,value="+value);
                                        }
                                        while (null != (value = in.readLine())) {
                                                member = new Member();
                                                if (member.parse(value)) {
                                                        memberList.add(member);
                                                }
                                        }
                                }
                                // 新录入,积分信息
                                else if (path.toString().contains("Integral_info.txt")) {
                                        Integral integral = null;
                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                        if(flag||iflag){
                                                System.out.println(" 导入积分规则信息,value="+value);
                                        }
                                        while (null != (value = in.readLine())) {
                                                integral = new Integral();
                                                if (integral.parse(value)) {
                                                        integralList.add(integral);
                                                }
                                        }
                                }
                                // 新录入,达量匹配规则信息
                                else if (path.toString().contains("Graded_info.txt")) {
                                        GradedData gradedData = null;
                                        in = new BufferedReader(new InputStreamReader(
                                                        new FileInputStream(path.toString().replace(
                                                                        "file:", "")), "GBK"));
                                        while (null != (value = in.readLine())) {
                                                gradedData = new GradedData();
                                                if (gradedData.parse(value)) {
                                                        gradedDataList.add(gradedData);
                                                }
                                        }
                                }
                        }
                } catch (IOException e) {
                        e.printStackTrace();
                } finally {
                        try {
                                in.close();
                        } catch (IOException e) {
                                e.printStackTrace();
                        }
                }
        }

        public void map(Object key, Text value, Context context)
                        throws IOException, InterruptedException {

                if (value == null || value.toString().equals("")) {
                        System.out.println("Value is NULL!! Value= " + value);
                        return;
                }

                String valueStr = new String(value.getBytes(), 0, value.getLength(),
                                "GBK");

                Trans tran = new Trans();
                if (!tran.parse(valueStr)) {    //这里parse方法将录入的数据,导入到tran对象中
                        // System.out.println("第 "+tran.i+" 条流水信息!!!");
                        System.out.println("Error: " + tran.toString());
                        outPutKey.set("error_" + key.toString());
                        outPutValue.set(valueStr);
                        context.write(outPutKey, outPutValue);
                        return;
                }

                // 计算供应商佣金
                 boolean hasSupTier = calculateSup(tran);
                // 计算客户非达量优惠支出
                 boolean preferNum = calculatePrefer(tran);    //就是这例如进入非达量优惠模块后,就会将所有数据一次性进行处理了,从而进入达量优惠处理模块时就会由于tran未空从而跳出
                // 计算达量优惠支出
                boolean gPreferNum = calculateGradedPrefer(tran);
                // 计算会员积分
                boolean integralNum = calculateIntegral(tran);

                if(!hasSupTier&&!preferNum&&!gPreferNum&&!integralNum){
                        outPutKey.set(randomStr);
                        outPutValue.set(tran.toString());
                        context.write(outPutKey, outPutValue);
                        return;
                }
        }// map




已有(2)人评论

跳转到指定楼层
langke93 发表于 2015-11-12 13:41:50
1.首先第一个问题
楼主的函数,确保集群都有相关包,否则根本调用不到,不止是hadoop jar的问题。而是在集群的每一台机器上都要加上相关包

2.第二个问题,楼主的调试确实挺麻烦的。

1.mapreduce调试分很多种:
一种是楼主的调试,MapReduce调试:在TaskTracker节点上查看打印信息
http://www.aboutyun.com/thread-13123-1-1.html
可以在TaskTracker直接查看。


2. context.write输出,这个可以直接在eclipse里查看
context.write(new Text("avg"), new LongWritable(scoreInt));


当然还有其它方法
相关内容推荐:
调试Hadoop源代码:eclipse调试及日志打印

hadoop的调试指导

Win7 Eclipse调试Centos Hadoop2.2-Mapreduce出现问题解决方案

大哥们,这个mapreduce如何调试啊
http://www.aboutyun.com/thread-5529-1-1.html




回复

使用道具 举报

Timmy 发表于 2015-11-13 16:44:31
谢谢,已经确认相关的包都是存在的,并且在注释屏蔽了第一个方法calculateSup(tran)后,其后面的一个方法calculatePrefer(tran)是可以调用的,但是这个方法之后calculateGradedPrefer(tran)方法就无法调用了,debug提示:(假设tran中有10条数据),tran的数据已经全部循环完毕;也就是说只要调用了上面四个方法的其中一个,tran数据就会被map全部循环一次;
       所以,问题是在于对于不同功能的方法,不能写在一个map中吗?
新问题:
1、  在hadoop2.x上运行程序时,观察程序运行状况的web界面是:http://  ip地址:8088 吗?
2、如果不对map进行设置,map就只会在NameNode节点上运行而不会分布式运行吗,哪该设置哪些呢?

谢谢!!!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条