分享

hadoop程序转为Spark程序遇到的问题

本帖最后由 梦回三国 于 2014-12-3 22:54 编辑

现有简单的hadoop程序,想将其转为spark程序,对于hadoop的map函数的转换没有什么问题,但是对于reduce的转换却出现问题,本人使用spark的groupby函数转换后使用map函数处理,但是转换后的数据量明显比reduce的数据量少了很多。这是为什么呢,有没有大神遇到这种情况呢?
这是hadoop的reduce函数:

                public static class IntSumReducer extends
                                Reducer<Text, Text, Text, Text>
                {

                        private Text word = new Text();
                        private Text outvals = new Text();
                        
                        public void reduce(Text key, Iterable<Text> values,
                                        Context context) throws IOException, InterruptedException
                        {
                               count++;

                                Iterator val1 =values.iterator();
                                int sum = 0,k=7
                                String temp="";
                                List<String> temp1=new ArrayList<String>();
                                for (Text val : values){
                                        sum++;
                                        temp1.add(val.toString());
                                        temp+=val.toString()+";";
                                }
                                if(sum>k){
                                        for(int i=0;i<temp1.size();i++){
                                           word.set(temp1.get(i).substring(0,11));
                                           outvals.set(temp1.get(i).substring(12,temp1.get(i).length())+"="+key.toString());
                                           context.write(word, outvals);        
                                          count1++;
                                        }
                        }
                }


下面是spark操作:
mapreduce1是需要reduce的RDD。count2的数量跟hadoop的map处理的数据量是一致的,这一点没问题。但是使用groupby处理后数据量count3明显比count少了。加了条件后count4比hadoop的加了条件处理的数据量count1少的更多,压根不在一个数量级上。


                long count2 = mapreduce1.count();
                System.out.println(count2);
               
                JavaPairRDD<String, Iterable<String>> mapreduce2=mapreduce1.groupByKey();
                long count3 = mapreduce2.count();
                System.out.println(count3);


                mapreduce2=mapreduce2.filter(new Function<Tuple2<String, Iterable<String>>, Boolean>() {
                        @Override
                        public Boolean call(Tuple2<String, Iterable<String>> tuple)
                                        throws Exception {
                                int sum = 0, k = 7;
                                for (String val : tuple._2()) {
                                        sum++;
                                }
                                if (sum > k) {
                                        return true;
                                }
                                return false;
                        }
                });
                long count4 = mapreduce2.count();
                System.out.println(count4);


问题已解决,业务逻辑的问题


已有(10)人评论

跳转到指定楼层
dulei 发表于 2014-12-3 09:48:39
hadoop程序转为Spark程序,学习学习
回复

使用道具 举报

sstutu 发表于 2014-12-3 12:21:03


对于spark与hadoop处理的同一数据,但是由于spark集群有自己的配置和机制,hadoop也有自己的配置和机制,所以同一数据,他们的无论是map数量,还是reduce数量,都是本集群配置相关的。即使是hadoop集群,同一数据,不同的集群,mapreduce也会有所不一样。
回复

使用道具 举报

梦回三国 发表于 2014-12-3 13:56:37
sstutu 发表于 2014-12-3 12:21
对于spark与hadoop处理的同一数据,但是由于spark集群有自己的配置和机制,hadoop也有自己的配置和机制 ...

对于每一个map和reduce来说确实跟集群有关,但是我的是累加器呀,最后按key的累加数量应该一样吧,不然最后结果也是不一样的啊,我的最后处理结果hadoop比spark高出了差不多有一个数量级,不知道为什么?
回复

使用道具 举报

w123aw 发表于 2014-12-3 14:44:09

怎么算的那,不过
前后有的count和count1,应该可以的,楼主可以细心算算

1.png

回复

使用道具 举报

梦回三国 发表于 2014-12-3 15:02:01
w123aw 发表于 2014-12-3 14:44
怎么算的那,不过
前后有的count和count1,应该可以的,楼主可以细心算算

这是两个计数器呢,前一个count是计算有多少分组的,后一个是计算满足条件的。结果这俩跟spark的都不一样
回复

使用道具 举报

sstutu 发表于 2014-12-3 16:03:08
能把你的计算过程说一下,这样才能更好的发现问题。
你的count是不是都是在单个reduce中,下面参考下,可能具有一些参考意义:


如果你的数据分割是一样的,key、value是一样的,这二者肯定是不一样的。


Hadoop保持记录
Spark 100 TB
Spark 1 PB
数据大小
102.5 TB
102 TB
1000 TB
耗时
72分钟
23分钟
234分钟
节点数
2100
206
190
# Cores
50400
6592
6080
# Reducers
10,000
29,000
250,000
Rate
1.42 TB/min
4.27 TB/min
4.27 TB/min
Rate/node
0.67 GB/min
20.7 GB/min
22.5 GB/min
Daytona Gray类别排序基准规则
环境
专用的数据中心
EC2 (i2.8xlarge)
EC2 (i2.8xlarge)


参考:http://www.aboutyun.com/thread-10412-1-1.html





回复

使用道具 举报

梦回三国 发表于 2014-12-3 16:23:24
sstutu 发表于 2014-12-3 16:03
能把你的计算过程说一下,这样才能更好的发现问题。
你的count是不是都是在单个reduce中,下面参考下,可 ...

首先我都是在单机上运行的,其次我的程序是这样的:
hadoop程序:
public static class TokenizerMapper extends
                                Mapper<Object, Text, Text, Text>
                {


                        private Text word = new Text();
                        private Text val = new Text();

                        public void map(Object key, Text value, Context context)
                                        throws IOException, InterruptedException
                        {
                                StringTokenizer itr = new StringTokenizer(value.toString());
                                String aa=value.toString();
                                String []b=aa.split("\t");
                                if((b.length>10)&&(b[1].length()>9)&&(b[3].length()>6)&&(b[9].length()>17)){
                                        word.set(b[3]);// che pai hao
                                        val.set(b[1]+"="+b[9])
                                        context.write(word, val);
                                        count++;
                                }
                        }
                }

                public static class IntSumReducer extends
                                Reducer<Text, Text, Text, Text>
                {

                        private Text word = new Text();
                        private Text outvals = new Text();
                       
                        public void reduce(Text key, Iterable<Text> values,
                                        Context context) throws IOException, InterruptedException
                        {
                                count1++;
                                Iterator val1 =values.iterator();
                                int sum = 0,k=7;// k wei jian ce dian yu zhi
                                String temp="";
                                List<String> temp1=new ArrayList<String>();
                                for (Text val : values){
                                        sum++;
                                        temp1.add(val.toString());
                                        temp+=val.toString()+";";
                                }
                                if(sum>k){
                                        for(int i=0;i<temp1.size();i++){
                                                count2++;
                                           if(temp1.get(i).length()<30)
                                                        continue;
                                           word.set(temp1.get(i).substring(0,11));
                                           outvals.set(temp1.get(i).substring(12,temp1.get(i).length())+"="+key.toString());
                                           context.write(word, outvals);       
                                        }
                                }

                        }
                }


将其转为spark程序是这样的:
                JavaSparkContext jsc = new JavaSparkContext(sparkconf);
                JavaPairRDD<String, String> mapreduce1 = jsc
                .textFile("D://1101.1")
                .filter(new Function<String, Boolean>() {
                        @Override
                        public Boolean call(String s) throws Exception {
                                String[] str = s.split("\t");
                                if (str.length > 10 && str[1].length() > 9
                                                && str[3].length() > 6 && str[9].length() > 17) {
                                        return true;
                                }
                                return false;
                        }
                })
                .mapToPair(new PairFunction<String, String, String>() {
                        @Override
                        public Tuple2<String, String> call(String s)
                                        throws Exception {
                                String[] str = s.split("\t");
                                return new Tuple2<String, String>(str[3], str[1] + "="
                                                + str[9]);
                        }
                });
                long count = mapreduce1.count();
                System.out.println(count);
               
                JavaPairRDD<String, Iterable<String>> mapreduce2=mapreduce1.groupByKey();
                long count1 = mapreduce2.count();
                System.out.println(count1);       
               
                mapreduce2=mapreduce2.filter(new Function<Tuple2<String, Iterable<String>>, Boolean>() {
                        @Override
                        public Boolean call(Tuple2<String, Iterable<String>> tuple)
                                        throws Exception {
                                int sum = 0, k = 7;
                                for (String val : tuple._2()) {
                                        sum++;
                                }
                                if (sum > k) {
                                        return true;
                                }
                                return false;
                        }
                });
                long count2 = mapreduce2.count();
                System.out.println(count2);
               
                JavaRDD<String> mapreduce3 = mapreduce2.map(new Function<Tuple2<String,Iterable<String>>, String>() {
                        @Override
                        public String call(Tuple2<String, Iterable<String>> tuple)
                                        throws Exception {
                                String str = "";
                                for (String val : tuple._2()) {
                                        str = val.substring(0, 11)+"        "+val.substring(12, val.length())+ "=" + tuple._1().toString();
                                }
                                return str;
                        }
                });
                long count3 = mapreduce3.count();
                System.out.println(count3);


                mapreduce3.saveAsTextFile("D://output//temp1");

count1,2,3……等是为了比较处理的数据量,最后hadoop程序跟spark程序得出的结果不一致,数据量不在同一个级别上,按理说应该结果一样才对呀。
回复

使用道具 举报

sstutu 发表于 2014-12-3 23:40:51
梦回三国 发表于 2014-12-3 16:23
首先我都是在单机上运行的,其次我的程序是这样的:
hadoop程序:
public static class TokenizerMappe ...
他们一样的概率是非常小的。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条