分享

实际难题,MapReduce适合两两计算的场景吗问题

mexiang 发表于 2013-10-26 15:11:10 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 8685
有一个大文件,文件中存储的是用户后买过的商品信息,文件的内容如下:
用户ID1,商品1,商品2,......,商品n
用户ID2,商品1,商品3,......,商品m
......
现在我要求高手购买共同商品最多的两个用户,用MapReduce该咋做呢吗?
我的简单思路是首先对每两个用户,求高手出购买的共同商品的数量,得到结果: 数量_用户x_用户y;
然后对结果进行排序,找到最大的那个,截取两个用户的ID,
但是不知道运用MapReduce该咋做,请大侠们帮帮忙吗?这个适合MapReduce解吗?

已有(7)人评论

跳转到指定楼层
skaterxu 发表于 2013-10-26 15:11:10
答复: 可以运用MapReduce实现。
这样的还是比较典型的,例如在推荐系统中,计算一个用户和其他用户的相似性,比较两个病毒样本的特征 。。。etc.

先看看不运用MapReduce咋实现。
1. 用户商品的关系可以拆分为 商品用户的关系,有了这两个关系就可以快速查找一个用户和其他用户的关系(这个关系是通过商品体现的)如上图
2. 拆分完后只要迭代用户的每条信息,找到用户购买的商品,在根据商品用户信息查找到此用户和其他用户的关系。
具体运行的代码
[ol] import time
import operator

def analyze_data():
    users_file = open("users.txt","r+")
    goods_file = open("goods.txt","r+")
    users_dict = makeDict(users_file)
    goods_dict = makeDict(goods_file)
    analyze_user(users_dict)
    analyze_goods(goods_dict)
    analyze_total(users_dict,goods_dict)
   
def find_same_key(user1,user2,dict):
    keys = dict.iterkeys()
    for k in keys:
        try:
            dict[user1+"_"+user2]
            return user1+"_"+user2
        except Exception:
            try:
                dict[user2+"_"+user1]
                return user2+"_"+user1
            except Exception:
                return None

        
def analyze_total(dict_user,dict_goods):
    keys = dict_user.iterkeys()
    total_dict = {}
    for user in keys:
        goods_list = dict_user[user]
        for goods in goods_list:
            user_list = dict_goods[goods]
            ''' exclusive the single user '''
            if len(user_list) == 1:
                continue
            
            for other in user_list:
                if other != user:
                    k = find_same_key(user,other,total_dict)
                    if k:
                        num = total_dict[k]
                        num += 1
                        total_dict[k] = num
                    else:
                        ''' first time '''
                        total_dict[user+"_"+other] = 1
    print "total record of the result  ", len(total_dict)
    li =  sorted([(y,x) for x,y in total_dict.items()], reverse=True)   
    get_top_n(li ,10)
   
''' get the top n user '''   
def get_top_n(li,top_n):
    for line in li:
        if top_n > 0:
            print line
        top_n -= 1
   
def analyze_user(dict):
    ''' analyze the user's data '''
    print "....................... user ........................."
    keys = dict.iterkeys()
    li = []
    for key in keys:
        li.append(len(dict[key]))
    li.sort()
    total = 0
    for line in li:
        total += line
    print "total users is  : ",len(dict)
    print "min goods count of user : ",li[0]
    print "max goods count of user : ",li[len(li)-1]
    print "total goods count of user : ",total
    print "avrage goods count of user : ",total/len(li)
    print "mean goods number is : ",li[int(len(li)/2)]
   
def analyze_goods(dict):
    ''' analyze the goods's data '''
    print ".................. goods ........................"
    keys = dict.iterkeys()
    li = []
    for key in keys:
        li.append(len(dict[key]))
    li.sort()
    total = 0
    for line in li:
        total += line
   
    print "total goods is  : ",len(dict)
    print "min users count of goods : ",li[0]
    print "max users count of goods : ",li[len(li)-1]
    print "total users count of goods : ",total
    print "avrage users count of goods : ",total/len(li)
    print "mean users number is : ",li[int(len(li)/2)]


''' construct a dict object '''   
def makeDict(file):
    dict = {}
    for line in file:
        arr = line.replace("\n","").split("\t")
        i = 0
        temp_list = []
        key = ""
        for record in arr:
            i += 1
            if i == 1:
                key = str(record)
            else:
                temp_list.append(record)
        dict[key] = temp_list
    return dict
if __name__ == "__main__":
    start_time = time.time()
    analyze_data()
    # sort_dict()
    print "total execute time is : ",(time.time() - start_time)[/ol]复制代码
测试数据:
....................... user .........................
total users is  :  1846
min goods count of user :  18
max goods count of user :  3244
total goods count of user :  510959
avrage goods count of user :  276
mean goods number is :  173
.................. goods ........................
total goods is  :  26555
min users count of goods :  1
max users count of goods :  1235
total users count of goods :  510959
avrage users count of goods :  19
mean users number is :  3
total record of the result   1980983
(2294, '2482807_tjz230')
(2246, '46921865_3459184') [次数,用户_用户]
(2210, '46921865_2482807')
(2173, 'GOAL_goal')
(2120, '46921865_tjz230')
(2108, '36855984_tjz230')
(2036, '46921865_nofish')
(2026, '2482807_36855984')
(1956, '2482807_3459184')
(1934, 'nofish_tjz230')
total execute time is :  328.155999899
备注: 一共 1846 个用户运行花费了 328 秒 ,26555个商品,  产生了 1,980,983 条数据,文件大小为 6M。
由上边的测试数据来看运行这样的操作还是比较耗费时间的,运用更高性能的机器(CPU更高,内存更大),或运用分布式系统是这种的两个解决方案。
回复

使用道具 举报

lijian123841314 发表于 2013-10-26 15:11:10
好,我帮着顶一下吧,暂时没方案,期待
回复

使用道具 举报

ruanhero 发表于 2013-10-26 15:11:10
运用MapReduce的解决思路为把 商品用户信息放到每个待运行的MapReduce 的TaskTracker服务区上。
分解每一条用户商品记录,然后和TaskTracker中的商品用户信息进行查找,存储信息,最后得出结果。
1. 如果商品用户信息可以放到内存中这样的处理方式最快,如果不能放到内存中只能运用文件查找的方式这样就比较慢了。
解决这种的思路为: 先用单服务器看看咋解决,然后在放到分布式环境下看怎样对数据进行划分。
如果在单服务器下都没解决方案,那在分布式环境下更会一头雾水。
回复

使用道具 举报

mexiang 发表于 2013-10-26 15:11:10
(2294, '2482807_tjz230')
(2246, '46921865_3459184') [次数,用户_用户]
(2210, '46921865_2482807')
(2173, 'GOAL_goal')
(2120, '46921865_tjz230')
(2108, '36855984_tjz230')
(2036, '46921865_nofish')
(2026, '2482807_36855984')
(1956, '2482807_3459184')
(1934, 'nofish_tjz230')
这些数据要除以2。
回复

使用道具 举报

dgxl 发表于 2013-10-26 15:11:10
给出一种MapReduce的实现[ol]
  • package com.netqin.examples;
  • /*
  • * author:biansutao
  • * email:biansutao[at]gmail dot com
  • * createDate:2011-11-23
  • */
  • import java.io.BufferedReader;
  • import java.io.FileReader;
  • import java.io.IOException;
  • import java.net.URI;
  • import java.util.HashMap;
  • import java.util.Map;
  • import org.apache.hadoop.conf.Configuration;
  • import org.apache.hadoop.filecache.DistributedCache;
  • import org.apache.hadoop.fs.Path;
  • import org.apache.hadoop.io.IntWritable;
  • import org.apache.hadoop.io.Text;
  • import org.apache.hadoop.mapreduce.Job;
  • import org.apache.hadoop.mapreduce.Mapper;
  • import org.apache.hadoop.mapreduce.Reducer;
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  • import org.apache.hadoop.util.GenericOptionsParser;
  • public class UserGoodsCounter {
  •     public static class  StatsMapper extends
  •             Mapper {
  •         private final static IntWritable countValue = new IntWritable(1);
  •         private Text userKey = new Text();
  •         private Map hashMap = new HashMap();
  •         protected void setup(Context context) throws IOException,
  •                 InterruptedException {
  •             try {
  •                 FileReader reader = new FileReader("goods.txt");
  •                 BufferedReader br = new BufferedReader(reader);
  •                 String line = null;
  •                 while ((line = br.readLine()) != null) {
  •                     String[] arr = line.split("\t");
  •                     hashMap.put(arr[0],line);
  •                 }
  •                 System.out.println("setUp :  total  "+hashMap.size()+" records");
  •                 br.close();
  •                 reader.close();
  •             } catch (Exception e) {
  •                 e.printStackTrace();
  •             }
  •         }
  •         
  •         public void map(Object key, Text value, Context context)
  •                 throws IOException, InterruptedException {
  •                 String line = value.toString();
  •                 String[] arr = line.split("\t");
  •                 String user = arr[0];
  •                 int len = arr.length;
  •                 System.out.println("now start process user "+user);
  •                 for(int i=1;i {
  •         private IntWritable result = new IntWritable();
  •         public void reduce(Text key, Iterable[I] values,
  •                 Context context) throws IOException, InterruptedException {
  •                 int sum = 0;
  •             for (IntWritable val : values) {
  •                 sum += val.get();
  •             }
  •             
  •             result.set(sum);
  •             context.write(key, result);
  •         }
  •     }
  •     public static void main(String[] args) throws Exception {
  •         Configuration conf = new Configuration();
  •         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  •         if (otherArgs.length != 2) {
  •             System.err.println("Usage:  hadoop jar    ");
  •             System.exit(2);
  •         }
  •         
  •         DistributedCache.createSymlink(conf);
  •         String path = "/tmp/goods.txt";
  •         Path filePath = new Path(path);
  •         String uriWithLink = filePath.toUri().toString() + "#" + "goods.txt";
  •         DistributedCache.addCacheFile(new URI(uriWithLink), conf);
  •         
  •         Job job = new Job(conf, "User Goods Stats");
  •         job.setJarByClass(UserGoodsCounter.class);
  •         job.setMapperClass(StatsMapper.class);
  •         job.setCombinerClass(StatsReducer.class);
  •         job.setReducerClass(StatsReducer.class);
  •         job.setOutputKeyClass(Text.class);
  •         job.setOutputValueClass(IntWritable.class);
  •         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  •         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  •         System.exit(job.waitForCompletion(true) ? 0 : 1);
  •     }
  • }[/ol]复制代码
  • 回复

    使用道具 举报

    xiaolongwu1987 发表于 2013-10-26 15:11:10
    统计出来的结果包括重复的数据 。
    例如A用户和B用户。
    A_B 10
    B_A 10
    结果数据在去重就可以了,这个可以在单机做也可以。
    回复

    使用道具 举报

    lovejunxia 发表于 2013-10-26 15:11:10
    比较相似度,比较赞同,不知道算法性能怎样!
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    关闭

    推荐上一条 /2 下一条