问题导读
1、什么时候需要用户自定义函数呢?
2、如何定义isValid函数?
3、在自定义运算函数(Eval function)中,需要注意schema的什么?
我们以气温统计和词频统计为例,讲解以下三种用户自定义函数。
用户自定义函数
什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择。
Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写。我们接下来以Java为例。
自定义过滤函数
我们仍然以先前的代码为例:
- records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);
- valid_records = filter records by temperature!=999;
复制代码
第二个语句的作用就是筛选合法的数据。如果我们采用用户自定义函数,则第二个语句可以写成:
- valid_records = filter records by isValid(temperature);
复制代码
这种写法更容易理解,也更容易在多个地方重用。接下来的问题就是如何定义这个isValid函数。代码如下:
- packagecom.oserp.pigudf;
-
- importjava.io.IOException;
-
- importorg.apache.pig.FilterFunc;
-
- importorg.apache.pig.data.Tuple;
-
-
- public class IsValidTemperature extends FilterFunc {
-
- @Override
-
- public Boolean exec(Tuple tuple)throws IOException {
-
- Object object = tuple.get(0);
-
- int temperature = (Integer)object;
-
- return temperature != 999;
-
- }
-
- }
复制代码
接下来,我们需要:
1) 编译代码并打包成jar文件,比如pigudf.jar。
2) 通过register命令将这个jar文件注册到pig环境:
register/home/user/hadoop_jar/pigudf.jar //参数为jar文件的本地路径
此时,我们就可以用以下语句调用这个函数:
- valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
-
- dump valid_records;
复制代码
看起来这个函数名太长,不便输入。我们可以用定义别名的方式代替:
- define isValid com.oserp.pigudf.IsValidTemperature();
- valid_records = filter records by isValid(temperature);
- dump valid_records;
复制代码
回到代码,我们可发现:
1) 需要定义一个继承自FilterFunc的类。
2) 重写这个类的exec方法。这个方法的参数只有一个tuple,但是调用时可以传递多个参数,你可以通过索引号获得对应的参数值,比如tuple.get(1)表示取第二个参数。
3) 调用时,需要使用类的全名。(当然你可以自定义别名)
4) 更多的验证需要读者自行在函数中添加,比如判断是否为null等等。
备注:用Eclipse编写Pig自定义函数时,你可能需要引用到一些Hadoop的库文件。比较容易的方式是在新建项目时指定项目类型为MapReduce项目,这样Eclipse就会自动设置库引用的相关信息。
自定义运算函数(Eval function)
仍然以前面的数据文件为例:
- 1990 21
-
- 1990 18
-
- 1991 21
-
- 1992 30
-
- 1992 999
-
- 1990 23
复制代码
假设我们希望通过温度值获得一个温度的分类信息,比如我们把温度大于划分为以下类型:
温度 分类
x>=30 hot
x>=10 and x<30 moderate
x<10 cool
则我们可以定义以下函数,代码如下:
- packagecom.oserp.pigudf;
-
- importjava.io.IOException;
-
- importorg.apache.pig.EvalFunc;
-
- importorg.apache.pig.data.Tuple;
-
-
- public class GetClassification extends EvalFunc<String> {
-
- @Override
-
- public String exec(Tuple tuple)throws IOException {
-
- Object object = tuple.get(0);
-
- int temperature = (Integer)object;
-
-
- if (temperature >= 30){
-
- return "Hot";
-
- }
-
- else if(temperature >=10){
-
- return "Moderate";
-
- }
-
- else {
-
- return "Cool";
-
- }
-
- }
-
- }
复制代码
依次输入以下Pig语句:
- records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);
-
- register /home/user/hadoop_jar/pigudf.jar;
-
- valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
-
- result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);
-
- dump result;
复制代码
输出结果如下:
- (1990,Moderate)
-
- (1990,Moderate)
-
- (1991,Moderate)
-
- (1992,Hot)
-
- (1990,Moderate)
复制代码
代码比较简单,该类继承自EvalFunc类,且我们要明确定义返回值类型。
有些时候其它类库可能包含有功能相近的Java函数,我们是否可以直接将这些库函数拿过来使用呢?可以。以下语句调用了trim函数,用于去掉name字段前后的空格:
- DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');
- B = FOREACH A GENERATE trim(name);
复制代码
其中的InvokeForString是一个Invoker(不知道该如何翻译啊),其通过反射机制调用,返回值是String类型。其它类似的还有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。
自定义加载函数
我们以词频统计为例,讲解如何自定义加载函数。(统计各个单词出现的频率,由高到低排序)
一般情况下,load语句加载数据时,一行会被生成一个tuple。而统计词频时,我们希望每个单词生成一个tuple。我们的测试数据文件只有两行数据,如下:
- Thisis a map a reduce program
- mapreduce partition combiner
复制代码
我们希望load后能得到如下形式的数据,每个单词一个tuple:
- (This)
- (is)
- (a)
- (map)
- (a)
- (reduce)
复制代码
先看代码:
复制代码
依次执行以下命令:
- records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});
- flatten_records= foreach records generate flatten($0);
- grouped_records= group flatten_records by words::w;
- result= foreach grouped_records generate group,COUNT(flatten_records);
- final_result= order result by $1 desc,$0;
- dumpfinal_result;
复制代码
显示结果如下:
- (a,2)
- (map,2)
- (reduce,2)
- (This,1)
- (combiner,1)
- (is,1)
- (partition,1)
- (program,1)
复制代码
注意schema的定义格式:(words:bag{word:(w:chararray)})
|