分享

使用hadoop mapreduce实现商品统计关联度


问题导读

1.如何使用mapreduce实现商品统计关联度?
2.使用mapreduce实现商品统计关联度实现的思路是什么?
3.通过map函数实现什么功能?
4.reduce在商品统计关联度中的作用是什么?







最近几天一直在看hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。

需求描述:
根据超市的销售清单,计算商品之间的关联程度(即统计同时买A商品和B商品的次数)。

数据格式:
超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:


1.jpg


需求分析:
采用hadoop中的mapreduce对该需求进行计算。
map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果如下图所示:

2.png

这里为了统计出和A、B两件商品想关联的商品,所以商品A、B之间的关系输出两条结果即 A-B、B-A。
reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:
通过map函数的处理,得到如下图所示的记录:

3.png


reduce中对map输出的value值进行分组计数,得到的结果如下图所示

4.png

将商品A B作为key,组合个数作为value输出,输出结果如下图所示:

5.jpg

对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现

代码实现:
关于代码就不做详细的介绍,具体参照代码之中的注释吧。


[mw_shl_code=java,true]package com;  
  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map.Entry;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
public class Test extends Configured implements Tool{  
  
    /**
     * map类,实现数据的预处理
     * 输出结果key为商品A value为关联商品B
     * @author lulei
     */  
    public static class MapT extends Mapper<LongWritable, Text, Text, Text> {  
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{  
            String line = value.toString();  
            if (!(line == null || "".equals(line))) {  
                //分割商品  
                String []vs = line.split(",");  
                //两两组合,构成一条记录  
                for (int i = 0; i < (vs.length - 1); i++) {  
                    if ("".equals(vs)) {//排除空记录  
                        continue;  
                    }  
                    for (int j = i+1; j < vs.length; j++) {  
                        if ("".equals(vs[j])) {  
                            continue;  
                        }  
                        //输出结果  
                        context.write(new Text(vs), new Text(vs[j]));  
                        context.write(new Text(vs[j]), new Text(vs));  
                    }  
                }  
            }   
        }  
    }  
      
    /**
     * reduce类,实现数据的计数
     * 输出结果key 为商品A|B value为该关联次数
     * @author lulei
     */  
    public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> {  
        private int count;  
         
        /**
         * 初始化
         */  
        public void setup(Context context) {  
            //从参数中获取最小记录个数  
            String countStr = context.getConfiguration().get("count");  
            try {  
                this.count = Integer.parseInt(countStr);  
            } catch (Exception e) {  
                this.count = 0;  
            }  
        }  
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{  
            String keyStr = key.toString();  
            HashMap<String, Integer> hashMap = new HashMap<String, Integer>();  
            //利用hash统计B商品的次数  
            for (Text value : values) {  
                String valueStr = value.toString();  
                if (hashMap.containsKey(valueStr)) {  
                    hashMap.put(valueStr, hashMap.get(valueStr) + 1);  
                } else {  
                    hashMap.put(valueStr, 1);  
                }  
            }  
            //将结果输出  
            for (Entry<String, Integer> entry : hashMap.entrySet()) {  
                if (entry.getValue() >= this.count) {//只输出次数不小于最小值的  
                    context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue()));  
                }  
            }  
        }  
    }  
      
    @Override  
    public int run(String[] arg0) throws Exception {  
        // TODO Auto-generated method stub  
        Configuration conf = getConf();  
        conf.set("count", arg0[2]);  
         
        Job job = new Job(conf);  
        job.setJobName("jobtest");  
         
        job.setOutputFormatClass(TextOutputFormat.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
         
        job.setMapperClass(MapT.class);  
        job.setReducerClass(ReduceT.class);  
         
        FileInputFormat.addInputPath(job, new Path(arg0[0]));  
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));  
         
        job.waitForCompletion(true);  
         
        return job.isSuccessful() ? 0 : 1;  
         
    }  
      
    /**
     * @param args
     */  
    public static void main(String[] args) {  
        // TODO Auto-generated method stub  
        if (args.length != 3) {  
            System.exit(-1);  
        }  
        try {  
            int res = ToolRunner.run(new Configuration(), new Test(), args);  
            System.exit(res);  
        } catch (Exception e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }  
  
}  [/mw_shl_code]

上传运行:

将程序打包成jar文件,上传到机群之中。将测试数据也上传到HDFS分布式文件系统中。
命令运行截图如下图所示:

6.png

运行结束后查看相应的HDFS文件系统,如下图所示:

7.jpg

到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~





来源:xiaojimanman





已有(4)人评论

跳转到指定楼层
arBen 发表于 2015-6-10 08:39:29
感谢楼主分享。一起学习。
回复

使用道具 举报

tang 发表于 2015-6-10 10:06:21
回复

使用道具 举报

hery 发表于 2015-6-10 10:26:57
感谢楼主分享
回复

使用道具 举报

levycui 发表于 2015-6-10 16:54:22
刚刚看了开头,就来评论,楼主写的很好,初学必备。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条