分享

Pig用户自定义函数(UDF)

xioaxu790 发表于 2014-8-25 18:09:24 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 11450
问题导读
1、什么时候需要用户自定义函数呢?
2、如何定义isValid函数?
3、在自定义运算函数(Eval function)中,需要注意schema的什么?





我们以气温统计和词频统计为例,讲解以下三种用户自定义函数。

用户自定义函数
什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择。
Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写。我们接下来以Java为例。

自定义过滤函数
我们仍然以先前的代码为例:
  1. records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);
  2. valid_records = filter records by temperature!=999;
复制代码


第二个语句的作用就是筛选合法的数据。如果我们采用用户自定义函数,则第二个语句可以写成:
  1. valid_records = filter records by isValid(temperature);
复制代码


这种写法更容易理解,也更容易在多个地方重用。接下来的问题就是如何定义这个isValid函数。代码如下:
  1. packagecom.oserp.pigudf;
  2. importjava.io.IOException;
  3. importorg.apache.pig.FilterFunc;
  4. importorg.apache.pig.data.Tuple;
  5. public class IsValidTemperature extends FilterFunc {
  6.          @Override
  7.          public Boolean exec(Tuple tuple)throws IOException {            
  8.                    Object object = tuple.get(0);
  9.                    int temperature = (Integer)object;            
  10.                    return temperature != 999;
  11.          }
  12. }
复制代码


接下来,我们需要:
1)  编译代码并打包成jar文件,比如pigudf.jar。
2)  通过register命令将这个jar文件注册到pig环境:
register/home/user/hadoop_jar/pigudf.jar //参数为jar文件的本地路径

此时,我们就可以用以下语句调用这个函数:
  1. valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
  2. dump valid_records;
复制代码


看起来这个函数名太长,不便输入。我们可以用定义别名的方式代替:
  1. define isValid com.oserp.pigudf.IsValidTemperature();
  2. valid_records = filter records by isValid(temperature);
  3. dump valid_records;
复制代码


回到代码,我们可发现:
1)  需要定义一个继承自FilterFunc的类。

2)  重写这个类的exec方法。这个方法的参数只有一个tuple,但是调用时可以传递多个参数,你可以通过索引号获得对应的参数值,比如tuple.get(1)表示取第二个参数。

3)  调用时,需要使用类的全名。(当然你可以自定义别名)

4)  更多的验证需要读者自行在函数中添加,比如判断是否为null等等。

备注:用Eclipse编写Pig自定义函数时,你可能需要引用到一些Hadoop的库文件。比较容易的方式是在新建项目时指定项目类型为MapReduce项目,这样Eclipse就会自动设置库引用的相关信息。


自定义运算函数(Eval function)
仍然以前面的数据文件为例:
  1. 1990 21
  2. 1990 18
  3. 1991 21
  4. 1992 30
  5. 1992 999
  6. 1990 23
复制代码


假设我们希望通过温度值获得一个温度的分类信息,比如我们把温度大于划分为以下类型:

温度                            分类

x>=30                          hot

x>=10 and x<30        moderate

x<10                                      cool

则我们可以定义以下函数,代码如下:
  1. packagecom.oserp.pigudf;
  2. importjava.io.IOException;
  3. importorg.apache.pig.EvalFunc;
  4. importorg.apache.pig.data.Tuple;
  5. public class GetClassification extends EvalFunc<String> {
  6.          @Override
  7.          public String exec(Tuple tuple)throws IOException {               
  8.                    Object object = tuple.get(0);
  9.                    int temperature = (Integer)object;
  10.                    if (temperature >= 30){
  11.                             return "Hot";
  12.                    }
  13.                    else if(temperature >=10){
  14.                             return "Moderate";
  15.                    }
  16.                    else {
  17.                             return "Cool";
  18.                    }               
  19.          }
  20. }
复制代码



依次输入以下Pig语句:
  1. records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);
  2. register /home/user/hadoop_jar/pigudf.jar;
  3. valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
  4. result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);
  5. dump result;
复制代码



输出结果如下:
  1. (1990,Moderate)
  2. (1990,Moderate)
  3. (1991,Moderate)
  4. (1992,Hot)   
  5. (1990,Moderate)
复制代码


代码比较简单,该类继承自EvalFunc类,且我们要明确定义返回值类型。


有些时候其它类库可能包含有功能相近的Java函数,我们是否可以直接将这些库函数拿过来使用呢?可以。以下语句调用了trim函数,用于去掉name字段前后的空格:
  1. DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');
  2. B = FOREACH A GENERATE trim(name);
复制代码


其中的InvokeForString是一个Invoker(不知道该如何翻译啊),其通过反射机制调用,返回值是String类型。其它类似的还有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。


自定义加载函数
我们以词频统计为例,讲解如何自定义加载函数。(统计各个单词出现的频率,由高到低排序)

一般情况下,load语句加载数据时,一行会被生成一个tuple。而统计词频时,我们希望每个单词生成一个tuple。我们的测试数据文件只有两行数据,如下:
  1. Thisis a map a reduce program
  2. mapreduce partition combiner
复制代码


我们希望load后能得到如下形式的数据,每个单词一个tuple:
  1. (This)
  2. (is)
  3. (a)
  4. (map)
  5. (a)
  6. (reduce)
复制代码



先看代码:
  1. package com.oserp.pigudf;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.hadoop.io.Text;
  6. importorg.apache.hadoop.mapreduce.InputFormat;
  7. import org.apache.hadoop.mapreduce.Job;
  8. importorg.apache.hadoop.mapreduce.RecordReader;
  9. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  11. import org.apache.pig.LoadFunc;
  12. importorg.apache.pig.backend.executionengine.ExecException;
  13. importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
  14. import org.apache.pig.data.BagFactory;
  15. import org.apache.pig.data.DataBag;
  16. import org.apache.pig.data.Tuple;
  17. import org.apache.pig.data.TupleFactory;
  18. public class WordCountLoadFunc extends LoadFunc {
  19.         
  20.          private RecordReader reader;
  21.          TupleFactorytupleFactory = TupleFactory.getInstance();
  22.          BagFactorybagFactory = BagFactory.getInstance();
  23.         
  24.          @Override        
  25.          public InputFormatgetInputFormat() throws IOException {
  26.                    return new TextInputFormat();
  27.          }      
  28.          @Override
  29.          public Tuple getNext()throws IOException {
  30.                     
  31.                    try {
  32.                             // 当读取到分区数据块的末尾时,返回null表示数据已读取完
  33.                             if (!reader.nextKeyValue()){
  34.                                      return null;
  35.                                      }
  36.                             Textvalue = (Text)reader.getCurrentValue();
  37.                             Stringline = value.toString();
  38.                             String[]words =  line.split("\\s+");// 断词
  39.                            
  40.                             // 因为getNext函数只能返回一个tuple,
  41.                             // 而我们希望每个单词一个单独的tuple,
  42.                             // 所以我们将多个tuple放到一个bag里面,
  43.                             // 然后返回一个包含一个bag的tuple。
  44.                             // 注:这只是一个用于演示用法的示例,实际中这样使用不一定合理。
  45.                             List<Tuple>tuples = new ArrayList<Tuple>();                    
  46.                             Tupletuple = null;
  47.                             for (String word : words) {
  48.                                      tuple= tupleFactory.newTuple();
  49.                                      tuple.append(word);
  50.                                      tuples.add(tuple);
  51.                                      }
  52.                            
  53.                             DataBagbag = bagFactory.newDefaultBag(tuples);
  54.                             Tupleresult = tupleFactory.newTuple(bag);
  55.                            
  56.                             return result;
  57.                    }
  58.                    catch (InterruptedException e) {
  59.                             throw new ExecException(e);
  60.                    }
  61.                   
  62.          }
  63.          @Override
  64.          public void prepareToRead(RecordReader reader,PigSplit arg1)
  65.                             throws IOException {
  66.                    this.reader = reader;
  67.          }
  68.          @Override
  69.          public void setLocation(String location, Job job) throws IOException {
  70.                    FileInputFormat.setInputPaths(job,location);         
  71.          }
  72. }
复制代码


依次执行以下命令:
  1.     records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});
  2.      flatten_records= foreach records generate flatten($0);
  3.      grouped_records= group flatten_records by words::w;
  4.       result= foreach grouped_records generate group,COUNT(flatten_records);
  5.       final_result= order result by $1 desc,$0;
  6.       dumpfinal_result;
复制代码



显示结果如下:
  1. (a,2)
  2. (map,2)
  3. (reduce,2)
  4. (This,1)
  5. (combiner,1)
  6. (is,1)
  7. (partition,1)
  8. (program,1)
复制代码


注意schema的定义格式:(words:bag{word:(w:chararray)})




已有(1)人评论

跳转到指定楼层
jancan 发表于 2015-9-14 22:45:16
非常有用,谢谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条