分享

Hadoop 多表 join:map side join 范例


问题导读

1.map side join适用于那些情况?
2.使用reduce side join的原因是什么?
3.使用reduce side join存在什么缺点?







在没有 pig 或者 hive 的环境下,直接在 mapreduce 中自己实现 join 是一件极其蛋疼的事情,MR中的join分为好几种,比如有最常见的 reduce side join,map side join,semi join 等。今天我们要讨论的是第 2 种:map side join,这种 join 在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,会造成大量的网络IO,效率低下。
1、原理:
      之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
2、环境:
本实例需要的测试文件及 hdfs 文件存放目录如下:
hadoop fs -ls /test/decli
Found 4 items
-rw-r--r--   2 root supergroup        152 2013-03-06 02:05 /test/decli/login
drwxr-xr-x   - root supergroup          0 2013-03-06 02:45 /test/decli/output
-rw-r--r--   2 root supergroup         12 2013-03-06 02:12 /test/decli/sex
-rw-r--r--   2 root supergroup         72 2013-03-06 02:44 /test/decli/user

测试文件内容分别为:
root@master 192.168.120.236 02:58:03 ~/test/table >
cat login  # 登录表,需要判断 uid 列是否有效,并得到对应用户名、性别、访问次数
1       0       20121213
2       0       20121213
3       1       20121213
4       1       20121213
1       0       20121114
2       0       20121114
3       1       20121114
4       1       20121114
1       0       20121213
1       0       20121114
9       0       20121114
root@master 192.168.120.236 02:58:08 ~/test/table >
cat sex # 性别表
0       男
1       女
root@master 192.168.120.236 02:58:13 ~/test/table >
cat user # 用户属性表
1       张三    hubei
3       王五    tianjin
4       赵六    guangzhou
2       李四    beijing
root@master 192.168.120.236 02:58:16 ~/test/table >
测试环境 hadoop 版本:
  1. echo $HADOOP_HOME
  2. /work/hadoop-0.20.203.0
复制代码
好了,废话少说,上代码:
3、代码:
  1. import java.io.BufferedReader;
  2. import java.io.FileReader;
  3. import java.io.IOException;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.conf.Configured;
  8. import org.apache.hadoop.filecache.DistributedCache;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  19. import org.apache.hadoop.util.GenericOptionsParser;
  20. import org.apache.hadoop.util.Tool;
  21. import org.apache.hadoop.util.ToolRunner;
  22. public class MultiTableJoin extends Configured implements Tool {
  23.     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
  24.         // 用于缓存 sex、user 文件中的数据
  25.         private Map<String, String> userMap = new HashMap<String, String>();
  26.         private Map<String, String> sexMap = new HashMap<String, String>();
  27.         private Text oKey = new Text();
  28.         private Text oValue = new Text();
  29.         private String[] kv;
  30.         // 此方法会在map方法执行之前执行
  31.         @Override
  32.         protected void setup(Context context) throws IOException,
  33.                 InterruptedException {
  34.             BufferedReader in = null;
  35.             try {
  36.                 // 从当前作业中获取要缓存的文件
  37.                 Path[] paths = DistributedCache.getLocalCacheFiles(context
  38.                         .getConfiguration());
  39.                 String uidNameAddr = null;
  40.                 String sidSex = null;
  41.                 for (Path path : paths) {
  42.                     if (path.toString().contains("user")) {
  43.                         in = new BufferedReader(new FileReader(path.toString()));
  44.                         while (null != (uidNameAddr = in.readLine())) {
  45.                             userMap.put(uidNameAddr.split("\t", -1)[0],
  46.                                     uidNameAddr.split("\t", -1)[1]);
  47.                         }
  48.                     } else if (path.toString().contains("sex")) {
  49.                         in = new BufferedReader(new FileReader(path.toString()));
  50.                         while (null != (sidSex = in.readLine())) {
  51.                             sexMap.put(sidSex.split("\t", -1)[0], sidSex.split(
  52.                                     "\t", -1)[1]);
  53.                         }
  54.                     }
  55.                 }
  56.             } catch (IOException e) {
  57.                 e.printStackTrace();
  58.             } finally {
  59.                 try {
  60.                     if (in != null) {
  61.                         in.close();
  62.                     }
  63.                 } catch (IOException e) {
  64.                     e.printStackTrace();
  65.                 }
  66.             }
  67.         }
  68.         public void map(LongWritable key, Text value, Context context)
  69.                 throws IOException, InterruptedException {
  70.             kv = value.toString().split("\t");
  71.             // map join: 在map阶段过滤掉不需要的数据
  72.             if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) {
  73.                 oKey.set(userMap.get(kv[0]) + "\t" + sexMap.get(kv[1]));
  74.                 oValue.set("1");
  75.                 context.write(oKey, oValue);
  76.             }
  77.         }
  78.     }
  79.     public static class Reduce extends Reducer<Text, Text, Text, Text> {
  80.         private Text oValue = new Text();
  81.         public void reduce(Text key, Iterable<Text> values, Context context)
  82.                 throws IOException, InterruptedException {
  83.             int sumCount = 0;
  84.             for (Text val : values) {
  85.                 sumCount += Integer.parseInt(val.toString());
  86.             }
  87.             oValue.set(String.valueOf(sumCount));
  88.             context.write(key, oValue);
  89.         }
  90.     }
  91.     public int run(String[] args) throws Exception {
  92.         Job job = new Job(getConf(), "MultiTableJoin");
  93.         job.setJobName("MultiTableJoin");
  94.         job.setJarByClass(MultiTableJoin.class);
  95.         job.setMapperClass(MapClass.class);
  96.         job.setReducerClass(Reduce.class);
  97.         job.setInputFormatClass(TextInputFormat.class);
  98.         job.setOutputFormatClass(TextOutputFormat.class);
  99.         job.setOutputKeyClass(Text.class);
  100.         job.setOutputValueClass(Text.class);
  101.         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
  102.                 args).getRemainingArgs();
  103.         // 我们把第1、2个参数的地址作为要缓存的文件路径
  104.         DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job
  105.                 .getConfiguration());
  106.         DistributedCache.addCacheFile(new Path(otherArgs[2]).toUri(), job
  107.                 .getConfiguration());
  108.         FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
  109.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[4]));
  110.         return job.waitForCompletion(true) ? 0 : 1;
  111.     }
  112.     public static void main(String[] args) throws Exception {
  113.         int res = ToolRunner.run(new Configuration(), new MultiTableJoin(),
  114.                 args);
  115.         System.exit(res);
  116.     }
  117. }
复制代码
运行命令:
  1. hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output
复制代码

4、结果:

运行结果:
root@master 192.168.120.236 02:47:18 ~/test/table >
hadoop fs -cat /test/decli/output/*|column -t
cat: File does not exist: /test/decli/output/_logs
张三  男  4
李四  男  2
王五  女  2
赵六  女  2
root@master 192.168.120.236 02:47:26 ~/test/table >



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条