分享

MapReduce 基础算法程序设计(2)

韩克拉玛寒 发表于 2015-3-6 20:49:35 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 25043
本帖最后由 pig2 于 2015-3-28 00:32 编辑
问题导读:

1、MapReduce如何在关系代数的运算上发挥作用?
2、交运算思想是什么?
3、自然连接操作Map端如何实现?



上一篇:MapReduce 基础算法程序设计(1)

关系代数运算
         MapReduce可以再关系代数的运算上发挥重要作用,因为关系代数运算具有数据相关性低的特性,这使得其便于进行MapReduce的并行化算法设计。
         常见的关系代数运算包括选择、投影、并、交、差以及自然连接操作等。都可以十分容易低利用MapReduce来进行并行化。下面介绍几种基于MapReduce的关系代数运算算法实现。首先下面两张表(见表1和表2)作为例子说明不同关系运算的流程:
表1   关系R
  
ID
  
NAME
AGE
GRADE

ID
NAME
AGE
GRADE
1
张小雅
20
91
3
李婷
21
82
2
刘伟
19
87
4
孙强
20
95
表2   关系S
  
ID
  
GENDER
HEIGHT

ID
GENDER
HEIGHT
1
165
3
170
2
178
4
175

         首先我们定义相应的类Relation来记录一条关系关系R或关系S中的数据。具体实现时会设计到MapReduce计算框架从关系数据库获得数据的问题,这里我们暂且假设我们的输入数据存储在文本文件中。
1、选择操作
      对于关系R应用条件C,例如在一张学生学号、姓名和成绩的关系表中查询分数大于90分的学生。我们只需要Map阶段对于每个输入的记录判断是否满足条件,将满足条件的记录Rc输出即可,输出键值对为(Rc,null),Reduce阶段无需做额外的工作。例如下面的代码就是找出关系R中所有属性号为id、属性值为value的项并输出:
                  Mapper实现代码如下:
  1. public classChoiceOptionAlgorithm {
  2.    /**
  3.     * 选择操作
  4.     */
  5.    //  Mapper实现代码如下:
  6.    public static class SelectionMap extends Mapper<LongWritable,Text, RelationoA, NullWritable> {
  7.       private int id;
  8.       private String value;
  9.       
  10.       @Override
  11.       //读入用户设置的条件
  12.       protected void setup(Context context)
  13.              throws java.io.IOException,InterruptedException {
  14.           id =context.getConfiguration().getInt("col", 0);
  15.           value =context.getConfiguration().get("value");
  16.       };
  17.       @Override
  18.       //扫描表,找到满足条件的记录发送出去
  19.       protected void map(LongWritableoffSet, Text line, Context context) throws java.io.IOException,InterruptedException {
  20.           RelationoArecord = newRelationoA(line.toString());
  21.           if(record.isCondition(id,value)){
  22.              context.write(record,NullWritable.get());
  23.           }
  24.       };
  25.    }
  26. }
  27. class  RelationoA{
  28.      .....
  29.    public RelationoA(Stringstring) {
  30.       // TODO Auto-generated constructor stub
  31.    }
  32.    public boolean isCondition(int id, String value) {
  33.       // TODO Auto-generated method stub
  34.       return false;
  35.    }
  36. }
复制代码


Map端代码只需要将满足条件的数据记录项输出即可,所以只有键没有值。而Reduce端不需要做任何事情。这里具体实现时我们不需要去写Reduce端代码,因为在MapReduce执行的过程中,其会生成一个系统自带的Reduce,这个Reduce是MapReduce为了保持框架的完整性自动调用的,它与我们自定义的Reduce不同的是,这个Reduce不会执行shuffle和数据传送,其输出的文件就是Map端输出的文件。当然我们也可以通过将Reduce的数目设为0来实现。

2、投影操作
         例如在关系R上应用投影操作获得属性AGE的所有值,我们只需要在Map阶段将每条记录在该属性上的值作为键输出即可。此时对应该键的值为MapReduce一个自定义类型NullWritable的一个对象。而在Reduce端我们仅仅将Map端输入的键输出即可。注意,此时投影操作具有去重的功能,例如在此例子中我们会获得20,19,21 三个结果。
Mapper实现代码如下:

   
  1. //获得指定列上的值并将其发送出去
  2.    public static class ProjectionMap extends Mapper<LongWritable,Text, Text, NullWritable>{
  3.       private int col;
  4.       //获取用户设置的列号
  5.       protected void setup(Context context) throws java.io.IOException,InterruptedException {
  6.           //获得投影操作属性的列号
  7.           col=context.getConfiguration().getInt("col", 0);
  8.       }
  9.       protected void map(LongWritable key,Text line, Context context) throws java.io.IOException ,InterruptedException {
  10.           RelationoArecord = newRelationoA(line.toString());
  11.           context.write(newText(record.getCol(col)),NullWritable.get());     
  12.       }
  13.    }
复制代码


Reduce端实现代码如下:
  1. public staticclassProjectionReduce extends Reducer<Text,  NullWritable,Text, NullWritable>{
  2.       //reduce端不需要做任何工作
  3.       protected void reduce(Text key,Iterable<NullWritable> value, Context context) throws java.io.IOException,InterruptedException {
  4.           context.write(key,NullWritable.get());
  5.       }
  6.    }
复制代码


3、交运算
         获得两张表的交集的主要思想如下:如果有一个关系T和关系R为同一模式,我们希望会的R和T的交集,那么在Map阶段我们对于R和T中的每一条数据记录r输出 (r,1),在Reduce阶段汇总计数,如果计算为2,我们则将该条记录输出。这里我们有一个需要额外注意的地方。我们只有将R和T表中相同的的记录都发送到了同意Reduce节点才会被其正确的判断为是交集中的一个记录而输出,因此我们必须保证相同的记录会被发送到相同的Reduce节点。由于实现时使用了RelationA对象作为主键,这是MapReduce默认会通过对象的hashcode值来划分Map的中间结果并输出到不同的Reduce节点,因此这里我们需要重写自定义类的hashCode方法使得值相同的对象的hashCode值也一定相同。例如,这里对应关系R的类的定义如下(给出了重写的hashCode方法,省略了其他方法),我们需要根据四个域的值来重写hashCode()方法使得具有相同域值的记录具有相同的哈希值。
  1. public class  RelationA implementsWritableComparable<RelationA>
  2. {
  3.    private int id;
  4.    private String name;
  5.    private int age;
  6.    private int grade;
  7.    //重写的hashCode方法
  8.    @Override
  9.    public int hashCode() {
  10.       int result = 17;
  11.       result= 31 * result + id;
  12.       result= 31 * result + name.hashCode();
  13.       result= 31 * result + age;
  14.       result= 31 * result + grade;
  15.       return result;
  16.    }
  17. }
  18. /**
  19.     * 交运算  Start
  20.     * 交运算Mapper实现代码如下
  21.     */
  22.    public static class IntersectionMap extends Mapper<LongWritable,Text, RelationA, IntWritable>{
  23.       private IntWritable one = new IntWritable(1);
  24.       
  25.       //对于每一条记录发送(recorf, 1)出去
  26.       protected void map(LongWritableoffSet, Text line, Context context) throws IOException ,InterruptedException {
  27.           RelationArecord = newRelationA(line.toString());
  28.           context.write(record,one);
  29.          
  30.       }
  31.    }
  32.    public static class IntersectionReducer extends Reducer<RelationA,IntWritable, RelationA, NullWritable>{
  33.       //统计一条记录的值的和,等于2,则是两个关系的交
  34.      protected void reduce(RelationA key,Iterable<IntWritable> value, Context context) throws IOException ,InterruptedException {
  35.           int sum = 0;
  36.           for(IntWritable val :value){
  37.              sum+= val.get();
  38.           }
  39.           if(sum == 2){//等于2 ,则发送出去
  40.              context.write(key,NullWritable.get());
  41.           }else{
  42.              System.out.println("find an exception!");
  43.           }
  44.       }
  45.    }
复制代码

4、差运算
      例如。计算R-T(这里关系T和关系R为同一模式),也即希望找出在R中存在而在T中不存在的记录,则对于R和T中的每一条记录r在Map阶段分别输出键值对(r,R)和(r,T)。在Reduce阶段检查一条记录r的所有对应值列表,如果只有R而没有T则将该条记录输出。这里与上面的交运算相似,都需要注意相同的记录应该被发送到相同的Reduce节点。
   差运算Map端实现代码如下:
  
  1. //差运算Map端实现代码如下  start
  2.    public static class DifferenceMap extends Mapper<Text,ByteWritable, RelationA, Text>{
  3.       //对于每一条记录发送简直对(record,relationName)出去
  4.       protected void map(Text relationName,BytesWritable content, Context context) throws IOException ,InterruptedException {
  5.           String[]records = newString(content.getBytes(),"UTF-8").split("\\n");
  6.           for (int i = 0; i < records.length; i++) {
  7.              RelationArecord = newRelationA(records);
  8.              context.write(record,relationName);
  9.           }
  10.          
  11.       }
  12.    }
  13.    代码中我们以整个关系文件作为一个Map节点的输入,在输出键值对时,键为每条记录项,而值则为该关系的名称。
  14.    Reduce端代码如下:
  15.    public static class DifferenceReduc extends Reducer<RelationA,Text, RelationA, NullWritable>{
  16.       StringsetR;
  17.       //获得用户设置的减集的名称
  18.       protected void setup(Context context) throws IOException,InterruptedException {
  19.           setR =context.getConfiguration().get("setR");
  20.       }
  21.       protected void reduce(RelationA key,java.lang.Iterable<Text> value, Context context) throws IOException,InterruptedException {
  22.           //检查来自一条记录的关系名称中有没有减集,没有则发送出去
  23.           for (Text val : value) {
  24.              if(!val.toString().equals(setR)){
  25.                 return;
  26.              }
  27.              context.write(key,NullWritable.get());
  28.           }
  29.       }
  30.    }
复制代码

在Reduce端我们在一个键的所有值汇总查询有没有减集,如果没有,则该记录需要被输出。

5、自然连接
         例如,我们需要在属性ID上做关系R和关系S的自然连接。在Map阶段对于每一条R和S中的记录r,我们把它的ID的值作为键,其余属性的值以及R(S中的记录为S的名称)的名称作为值输出出去。在Reduce阶段我们则将统一键中所有的值,根据他们的来源(RR和S)分为两组做笛卡尔乘积然后将得到的结果输出出去。
         例如以上面的关系R和关系S为例。关系R中ID为1的记录会以键值对(1,(relation,张小雅,20,91))发射出去,而关系S中ID为1的记录会以键值对(1,(relationS,女,165))发射出去,这里在值前面添加来源关系的名称是为例Reduce端能够辨别键值对的来源。在Reduce端ID为1,的值有两个,按照它们的来源分组为两组(张小雅,20,91)和(女,165),然后将这两组进行笛卡尔乘积并添加上ID(也就是键)作为新的值发出去,这里新的值为:(1,张小雅,20,91,女,165)。
         自然连接操作Map端的实现代码如下:
  1. //自然连接操作Map端的实现代码    start
  2.    public static class NaturalJoinMap extends  Mapper<Text, BytesWritable, Text,Text>{
  3.       private int col;
  4.       
  5.       //获得用户设置的连接属性的列号
  6.       protected void setup(Context context) throws IOException,InterruptedException {
  7.           col =context.getConfiguration().getInt("col", 0);
  8.       }
  9.       
  10.       protected void map(Text relationName,BytesWritable content, Context context) throws IOException ,InterruptedException {
  11.           String[]records = newString(content.getBytes(),"UTF-8").split("\\n");
  12.           for (int i = 0; i < records.length; i++) {
  13.              RelationArecord = newRelationA(records);
  14.              context.write(new Text(record.getCol(col)), newText(relationName.toString() + " " + record.getValueExcept(col)));
  15.           }
  16.       }
  17.    }
复制代码

   Map端首先从用户获得需要连接的属性的列号,谈后对于每一条记录,以相应属性上的值作为键,剩余的树形作为值发送键值对到Reduce端。
         Reduce端的实现代码如下:

  1.   //Reduce端的实现代码如下
  2.    public static class NaturalJoinRduce extends Reducer<Text, Text,Text, NullWritable>{
  3.       //存储关系名称
  4.       private String relationNameA;
  5.       protected void setup(Context context) throws IOException,InterruptedException {
  6.           relationNameA =context.getConfiguration().get("relationNameA");
  7.       }
  8.       protected void reduce(Text key,java.lang.Iterable<Text> values, Context context) throws IOException,InterruptedException {
  9.           ArrayList<Text>setR = newArrayList<Text>();
  10.           ArrayList<Text>setS = newArrayList<Text>();
  11.           //按照来源分为两组,然后做笛卡尔乘积
  12.           for (Text val : values) {
  13.              String[]recordInfo = val.toString().split(" ");
  14.              if(recordInfo[0].equalsIgnoreCase(relationNameA)){
  15.                 setR.add(new Text(recordInfo[1]));
  16.              }else{
  17.                 setS.add(new Text(recordInfo[1]));
  18.              }
  19.           }
  20.           for (int i = 0; i <setR.size(); i++) {
  21.              for (int j = 0; j <setS.size(); j++) {
  22.                 Textt = newText(setR.get(i).toString() + "," + key.toString() + "," + setS.get(j).toString());
  23.                 context.write(t,NullWritable.get());
  24.              }
  25.           }
  26.       }
  27.    }
复制代码

         除了以上这些常用的关系代数算法外,MapReduce当然还可以更多的关系代数操作。因为大多数的关系代数操作都可以分解为关系数据库中每一条数据的操作。所以可以很方便地应用MapReduce计算框架来进行算法设计。





相关帖子:
MapReduce 基础算法程序设计(1)

MapReduce 基础算法程序设计(2)

MapReduce 基础算法程序设计(3):单词共现算法
欢迎加入about云群425860289432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(1)人评论

跳转到指定楼层
kaka100 发表于 2015-5-9 17:20:57
真不容易,还没看懂,先存一份吧
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条