分享

hadoop初级 pig入门经验总结

nettman 发表于 2014-8-22 18:46:37 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10253
问题导读:
1.UNION与传统sql的unoin的相同与不同之处?
2.pig有哪些符合数据类型?
3.比较pig基本语法与传统数据库的区别?





pig简介
pig是hadoop上层的衍生架构,与hive类似。对比hive(hive类似sql,是一种声明式的语言),pig是一种过程语言,类似于存储过程一步一步得进行数据转化。

pig数据类型
  • double > float > long > int > bytearray
  • tuple|bag|map|chararray > bytearray
double float long int chararray bytearray都相当于pig的基本类型
tuple相当于数组 ,但是可以类型不一,举例('dirkzhang','dallas',41)
Bag相当于tuple的一个集合,举例{('dirk',41),('kedde',2),('terre',31)},在group的时候会生成bag
Map相当于哈希表,key为chararray,value为任意类型,例如['name'#dirk,'age'#36,'num'#41
nulls 表示的不只是数据不存在,他更表示数据是unkown

pig latin语法

1:load
LOAD 'data' [USING function] [AS schema];
       例如:
      load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}'    USING com.bonc.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);

Table = load ‘url’ as (id,name…..);    //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。

2:filter       alias = FILTER alias BY expression;
       Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and  和or连接各条件
       例如:
       filter = filter load20 by ( MONTH_ID == '1210' and  DAY_ID == '18' and  PROV_ID == '010' );


3:group
alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
          pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag
         Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加
         可以使用ALL实现对所有字段的分组

4:foreach
alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];

alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression [AS schema] [expression [AS schema]….]
};

一般跟generate一块使用
         Table = foreach Table generate (id,name);括号可加可不加。
avg = foreach Table generate group, AVG(age);  MAX ,MIN..

在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换

5:join
Inner  join Syntax
  1. alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];
复制代码



Outer join Syntax
  1. alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
复制代码


     join/left join / right join
  1. daily = load 'A' as (id,name, sex);
  2. divs  = load 'B' as (id,name, sex);
复制代码



join
  1. jnd   = join daily by (id, name), divs by (id, name);      
复制代码



left join
  1. jnd   = join daily by (id, name) left outer, divs by (id, name);
复制代码


也可以同时多个变量,但只用于inner join
  1. A = load 'input1' as (x, y);
  2. B = load 'input2' as (u, v);
  3. C = load 'input3' as (e, f);
  4. alpha = join A by x, B by u, C by e;
复制代码



6: union
alias = UNION [ONSCHEMA] alias, alias [, alias …];

union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;

C: {x: int,y: float}

A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: double,y: double}

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown.

注意:在pig 1.0中 执行不了最后一种union。

如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}

join和union之后alias的别名会变

7:Dump
     dump alias
用于在屏幕上显示数据。

8:Order by
alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];
         A = order Table by id desc;

9:distinct
         A = distinct alias;

10:limit
         A = limit alias 10;

11:sample
SAMPLE alias size;

随机抽取指定比例(0到1)的数据。
some = sample divs 0.1;

13:cross
alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。
--cross.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;


15:split
Syntax
SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];

A = LOAD 'data' AS (f1:int,f2:int,f3:int);
DUMP A;
(1,2,3)
(4,5,6)
(7,8,9)
SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);

DUMP X;
(1,2,3)
(4,5,6)

DUMP Y;
(4,5,6)

DUMP Z;
(1,2,3)
(7,8,9)

16:store
         Store  … into … Using…


pig在别名维护上:

1、join
如e = join d by name,b by name;
    g = foreach e generate $0 as one:chararray, $1 as two:int, $2 as      three:chararray,$3 asfour:int;
    他生成的schemal:

        e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}

g: {one: chararray,two: int,three: chararray,four: int}

2、group

   B = GROUP A BY age;

----------------------------------
| B     | group: int | A: bag({name: chararray,age: int,gpa: float}) |
----------------------------------
|       | 18         | {(John, 18, 4.0), (Joe, 18, 3.8)}             |
|       | 20         | {(Bill, 20, 3.9)}                             |
----------------------------------
(18,{(John,18,4.0F),(Joe,18,3.8F)})


pig udf自定义
pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中

  1. public class MyUDF extends EvalFunc<String> {  
  2.   
  3.     @Override  
  4.     public String exec(Tuple input) throws IOException {  
  5.         if(input == null || input.size() ==0)  
  6.             return null;  
  7.         try {  
  8.             String val = (String) input.get(0);  
  9.             return new StringBuffer(val).append(" pig").toString();  
  10.         } catch (Exception e) {  
  11.             throw new IOException(e.getMessage());  
  12.         }  
  13.     }  
  14.   
  15. }  
复制代码
pig支持udf in loader and store
udf loader 需要继承于LoadFunc
udf storer 需要继承于StoreFunc
这类似于hadoop中写inputformat和outputformat
其中vertica就是写了一个DB版本的

这里贴一个简单的loader的例子:
  1. public class MyLoader extends LoadFunc{
  2. protected RecordReader recordReader = null;
  3. private PreparedStatement ps;
  4. private Connection conn;
  5. private final String jdbcURL;
  6. private final String user;
  7. private final String pwd;
  8. private final String querySql;
  9. private ResultSet rs;
  10. public MyLoader(String driver,String jdbcURL,String user,String pwd,String querySql){
  11. try {
  12. Class.forName(driver);
  13. } catch (Exception e) {
  14. // TODO: handle exception
  15. }
  16. this.jdbcURL = jdbcURL;
  17. this.user = user;
  18. this.pwd = pwd;
  19. this.querySql = querySql;
  20. }
  21. @Override
  22. public InputFormat getInputFormat() throws IOException {
  23. return new PigTextInputFormat();
  24. }
  25. @Override
  26. public Tuple getNext() throws IOException {
  27. // TODO 重要的读取过程
  28. Text val = null;
  29. boolean next = false;
  30. try {
  31. next = rs.next();
  32. } catch (Exception e) {
  33. // TODO: handle exception
  34. }
  35. if(!next)
  36. return null;
  37. ResultSetMetaData rsmd;
  38. try {
  39. // rsmd = result
  40. } catch (Exception e) {
  41. // TODO: handle exception
  42. }
  43. return null;
  44. }
  45. @Override
  46. public void prepareToRead(RecordReader arg0, PigSplit arg1)
  47. throws IOException {
  48. this.recordReader = arg0;
  49. }
  50. @Override
  51. public void setLocation(String arg0, Job arg1) throws IOException {
  52. //no idea
  53. }
  54. public ResourceSchema getSchema(String location,Job job) throws IOException{
  55. Configuration conf = job.getConfiguration();
  56. Schema schema = new Schema();
  57. try {
  58. //TODO:reader from database table
  59. // Connection conn = DriverManager.getConnection(this.jdbcURL, this.user, this.pwd);
  60. FieldSchema fieldName = new FieldSchema("name", DataType.CHARARRAY);
  61. FieldSchema fieldPosition = new FieldSchema("position", DataType.INTEGER);
  62. schema.add(fieldName);
  63. schema.add(fieldPosition);
  64. } catch (Exception e) {
  65. //TODO log exception
  66. }
  67. return null;
  68. }
  69. public void prepareToRead(){
  70. }
  71. }
复制代码

其中getNext方法就是如何处理reader读取出的数据
        getSchema可以固定读取数据的schema
        setLocation可以处理输入的数据源
        prepareToRead是读取数据之前,可以在此做标识,等等




加微信w3aboutyun,可拉入技术爱好者群

已有(1)人评论

跳转到指定楼层
wubaozhou 发表于 2014-12-31 12:32:25
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条