分享

如何用mapreduce实现多字段的group by和order by

string2020 发表于 2013-12-14 09:34:55 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 33219
提示: 作者被禁止或删除 内容自动屏蔽

已有(6)人评论

跳转到指定楼层
rsgg03 发表于 2013-12-14 12:31:51
首先不知道你为什么这么做,下面给你一些思路。如果你想学习的话,建议使用hive,生成mapreduce,然后参考一下,它是怎么生成的。
既然你不想用hive,你只能自己写算法了。这个需要自己写函数,这个实现起来时非常麻烦的。只是提供 参考,这里只是抛砖引玉。1.如何分组:
你需要对字符串进行分割,并且不断比较。
2.如何排序:
简单的方法,你可以使用数组实现。如果不想用数组,那你只能自己写个数组,或则自己建立某个数据结构来实现了。

回复

使用道具 举报

string2020 发表于 2013-12-14 16:12:22
提示: 作者被禁止或删除 内容自动屏蔽
回复

使用道具 举报

lzw 发表于 2013-12-14 18:23:13
string2020 发表于 2013-12-14 16:12
假设我现在把每一行都用逗号分隔开了,
下一步怎么做。

我晚上给你写一个例子你看看,就明白了,其中主要考虑的是gorup中的key如何设计,设计好了自然就能解决
回复

使用道具 举报

lzw 发表于 2013-12-14 20:36:24
string2020 发表于 2013-12-14 16:12
假设我现在把每一行都用逗号分隔开了,
下一步怎么做。
  1. package com.duplicate;
  2. import java.io.IOException;
  3. import java.util.Random;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.FloatWritable;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. public class OdsEmp {
  15.         public static class Map extends Mapper<Object, Text, Text, Text>{
  16.                
  17.                 private Text k = new Text();
  18.                 private Text vs = new Text();
  19.                
  20.                 public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
  21.                         String v = value.toString();
  22.                         String[] token = v.split(",");
  23.                         k.set(token[0] + "," + token[1]); //使用 i_date + "," + v_emp_id组合key值
  24.                        
  25.                         vs.set(token[3] + "," + token[4]); // 使用 i_amr1 + "," + i_amr2组合value值
  26.                         context.write(k, vs);
  27.                 }
  28.         }
  29.        
  30.         public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
  31.                
  32.                 public void reduce(Text key,Iterable<Text> values, Context context)throws IOException,InterruptedException{
  33.                         FloatWritable i_amr1  = new FloatWritable();
  34.                         FloatWritable i_amr2  = new FloatWritable();
  35.                        
  36.                         // 进行同一组的sum
  37.                         for(Text tx : values){
  38.                                 String[] split = tx.toString().split(",");
  39.                                 i_amr1.set( i_amr1.get() + Float.parseFloat(split[0]));
  40.                                 i_amr2.set(i_amr2.get() + Float.parseFloat(split[1]));
  41.                         }// End for
  42.                        
  43.                         Text value = new Text();
  44.                         value.set(key + "," + i_amr1 + "," + i_amr2); // 以i_date,v_emp_id,v_proj_id,i_amr1,i_amr2输出
  45.                         context.write(null, value);
  46.                 }
  47.         }
  48.        
  49.         /**
  50.          * @param args
  51.          * @throws IOException
  52.          * @throws ClassNotFoundException
  53.          * @throws InterruptedException
  54.          */
  55.         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  56.                 // TODO Auto-generated method stub
  57.                 Configuration conf = new Configuration();
  58.                 Job job = new Job(conf,"OdsEmp");
  59.                 job.setJarByClass(OdsEmp.class);
  60.                 job.setMapperClass(Map.class);
  61.                 job.setReducerClass(Reduce.class);
  62.                
  63.                 job.setOutputKeyClass(NullWritable.class);
  64.                 job.setOutputValueClass(Text.class);
  65.                 FileInputFormat.addInputPath(job, new Path(args[0]));
  66.                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
  67.                 System.exit(job.waitForCompletion(true)?0:1);
  68.                
  69.         }
  70. }
复制代码
上面代码是对应group 和sum的实现代码,对排序将上面job执行输出值,再做一次mapreduce进行排序。
回复

使用道具 举报

string2020 发表于 2013-12-14 20:43:59
提示: 作者被禁止或删除 内容自动屏蔽
回复

使用道具 举报

lzw 发表于 2013-12-14 20:51:09
string2020 发表于 2013-12-14 20:43
谢谢,太感谢了。

有点思路了,关键是map输出的时候,key要是group by的字段组合,value就是要求和的 ...

但是后面排序也是很关键的,我这里还在思考中。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条