分享

MapReduce中设置全局变量

hyj 2014-4-23 16:02:50 发表于 问题解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 73630
问题导读:
1.MapReduce的应用场景是什么?
2.如何通过Configuration来设置全局变来那个?
3.如何获取设置的全局参数?






实际项目中遇到这样一个场景,需要运行一个MapReduce统计一些数据中的最大最小平均值等特性,将结果存入到HBase中。存结果的同时还要记录这次分析任务的编号,即所有的Reduce产生的结果中都要包含这个任务编号这个字段。当然我们可以把这个任务编号放到输入文件中的每一行中,作为输入数据的一部分,不过这样做显然太不专业,无端的增加了要处理的数据量,加重网络负担。经过网上搜索,发现可以用Configuration来实现。具体过程:

提交job的函数中

  1. Configuration conf = new Configuration();
  2. conf.setStrings("job_parms", "aaabbc"); //关键就是这一句
  3.         Job job = new Job(conf, "load analysis");        
  4.         job.setJarByClass(LoadAnalysis.class);
  5.         job.setMapperClass(LoadMapper.class);
  6.         job.setReducerClass(LoadIntoHbaseReduce.class);
  7.         job.setMapOutputKeyClass(Text.class);
  8.         job.setMapOutputValueClass(Text.class);
  9.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
复制代码
Mapper类中重写setup函数



  1.   @Override
  2.         protected void setup(Context context)
  3.                 throws IOException, InterruptedException {
  4.             try {
  5.               
  6.                 //从全局配置获取配置参数
  7.                 Configuration conf = context.getConfiguration();
  8.                 String parmStr = conf.get("job_parms"); //这样就拿到了
  9.                
  10.                ......
  11.                
  12.             } catch (SQLException e) {
  13.                
  14.                 e.printStackTrace();
  15.             }
  16.             
  17.         }
复制代码

当然 Reduce类中也可以同样操作




已有(7)人评论

跳转到指定楼层
pig2 发表于 2014-4-23 16:08:37
上面这个方法有一个限制:
conf.set()方法只能传递String类型的变量,如果在mapper或reducer中更合适使用别的类型(比如List,Map等),则需要通过一些方法来转换,而如果转换方法比较复杂或者数据量比较大,则对整个程序的效率会产生很大的影响,因为上面这种方法每次调用map函数都会进行同样的转换,这种重复性的工作是不可容忍的,并且也违反了我们使用全局变量的初衷。

替代的方法是将static变量定义为mapper类的成员变量,并在static块或static成员函数中进行初始化,这样整个mapper过程只需对static变量初始化一次,这对效率会有很大的提高。

程序结构如下:

        public class myMR{
          public static class myMapper extends Mapper<Text, Text, Text, Text>{
               private static Set<**> subglobal;
               //other static variable
               static{
                    //initialize 'subglobal'
               }
               //other static blocks
               public void map(Text key, Text value, Context context)
                                      throws IOException, InterruptedException{
                    //直接使用subglobal
               }
          }
          public static class myReducer extends Reducer<Text, Text, Text, Text>{
               public void reducer(Text key, Iterable<Text>, Context context)
                                     throws IOException, InterruptedException{
                     //reducer process
               }
          }
          public static void main(String[] args){
               //your job
          }

对java熟悉的话很容易就可以看出来,这只是个static对象初始化问题。但是这种方法使用的并不是mapper和reducer角度上的全局变量,而只是站在某个mapper task角度上的“全局变量”。而这种情况下除了使用static变量及其初始化块这种方法之外,还可以添加自定义的setup(Context)和close(Context)函数来初始化,这两个方法只在某mapper或reducer task开始和结束的时候分别调用一次,可以起到与上面代码所用方法一样的效果。

回复

使用道具 举报

hyj 发表于 2014-4-23 16:18:39
找到一些关于这方面的资料,补充一下:

mapreduce编程全局变量:conf.set().get()

文件reduce输出作为map输入:中间临时文件sequencefile格式(map输入三种格式之一)

reduce结果排序:setPartitionerClass(TotalOrderPartitioner.class)

获取输入文件名:context.get

全局文件:hadoop有distributed cache来保存全局文件,保证所有node都可以访问,使用类名为DistributedCache
回复

使用道具 举报

a553392350 发表于 2014-4-24 14:38:34
写的比较好,值得一看
回复

使用道具 举报

perfri 发表于 2014-4-27 23:29:52
值得一看的帖子,谢谢分享
回复

使用道具 举报

monkey131499 发表于 2015-1-6 10:11:57
学习了,正好遇到这方面问题
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条