hadoop初级 pig入门经验总结
问题导读:1.UNION与传统sql的unoin的相同与不同之处?
2.pig有哪些符合数据类型?
3.比较pig基本语法与传统数据库的区别?
static/image/hrline/4.gif
[*]pig简介
[*]pig数据类型
[*]pig latin语法
[*]pig udf自定义
[*]pig derived衍生
[*]
[*]推荐网站 http://pig.apache.org/docs/r0.10.0/basic.html
pig简介
pig是hadoop上层的衍生架构,与hive类似。对比hive(hive类似sql,是一种声明式的语言),pig是一种过程语言,类似于存储过程一步一步得进行数据转化。
pig数据类型
[*]double > float > long > int > bytearray
[*]tuple|bag|map|chararray > bytearray
double float long int chararray bytearray都相当于pig的基本类型tuple相当于数组 ,但是可以类型不一,举例('dirkzhang','dallas',41)Bag相当于tuple的一个集合,举例{('dirk',41),('kedde',2),('terre',31)},在group的时候会生成bagMap相当于哈希表,key为chararray,value为任意类型,例如['name'#dirk,'age'#36,'num'#41nulls 表示的不只是数据不存在,他更表示数据是unkown
pig latin语法
1:loadLOAD 'data' ; 例如: load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}' USING com.bonc.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);
Table = load ‘url’ as (id,name…..); //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。
2:filter alias = FILTER alias BY expression; Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and和or连接各条件 例如: filter = filter load20 by ( MONTH_ID == '1210' andDAY_ID == '18' andPROV_ID == '010' );
3:groupalias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] ; pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加 可以使用ALL实现对所有字段的分组
4:foreachalias = FOREACH alias GENERATE expression ….];
alias = FOREACH nested_alias {alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]GENERATE expression ….]};
一般跟generate一块使用 Table = foreach Table generate (id,name);括号可加可不加。avg = foreach Table generate group, AVG(age);MAX ,MIN..
在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换
5:joinInnerjoin Syntaxalias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) ;
Outer join Syntaxalias = JOIN left-alias BY left-alias-column , right-alias BY right-alias-column ;
join/left join / right joindaily = load 'A' as (id,name, sex);
divs= load 'B' as (id,name, sex);
joinjnd = join daily by (id, name), divs by (id, name);
left joinjnd = join daily by (id, name) left outer, divs by (id, name);
也可以同时多个变量,但只用于inner joinA = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;
6: unionalias = UNION alias, alias [, alias …];
union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。
A = load 'input1' as (x:int, y:float);B = load 'input2' as (x:int, y:float);C = union A, B;describe C;
C: {x: int,y: float}
A = load 'input1' as (x:double, y:float);B = load 'input2' as (x:int, y:double);C = union A, B;describe C;C: {x: double,y: double}
A = load 'input1' as (x:int, y:float);B = load 'input2' as (x:int, y:chararray);C = union A, B;describe C;Schema for C unknown.
注意:在pig 1.0中 执行不了最后一种union。
如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字A = load 'input1' as (w: chararray, x:int, y:float);B = load 'input2' as (x:int, y:double, z:chararray);C = union onschema A, B;describe C;C: {w: chararray,x: int,y: double,z: chararray}
join和union之后alias的别名会变
7:Dump dump alias用于在屏幕上显示数据。
8:Order byalias = ORDER alias BY { * | field_alias [, field_alias …] } ; A = order Table by id desc;
9:distinct A = distinct alias;
10:limit A = limit alias 10;
11:sampleSAMPLE alias size;
随机抽取指定比例(0到1)的数据。some = sample divs 0.1;
13:crossalias = CROSS alias, alias [, alias …] ;
将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。--cross.pigdaily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,close:float, volume:int, adj_close:float);divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);tonsodata = cross daily, divs parallel 10;
15:splitSyntaxSPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];
A = LOAD 'data' AS (f1:int,f2:int,f3:int);DUMP A;(1,2,3)(4,5,6)(7,8,9)SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);
DUMP X;(1,2,3)(4,5,6)
DUMP Y;(4,5,6)
DUMP Z;(1,2,3)(7,8,9)
16:store Store… into … Using…
pig在别名维护上:
1、join如e = join d by name,b by name; g = foreach e generate $0 as one:chararray, $1 as two:int, $2 as three:chararray,$3 asfour:int; 他生成的schemal:
e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}
g: {one: chararray,two: int,three: chararray,four: int}
2、group
B = GROUP A BY age;
----------------------------------| B | group: int | A: bag({name: chararray,age: int,gpa: float}) |----------------------------------| | 18 | {(John, 18, 4.0), (Joe, 18, 3.8)} || | 20 | {(Bill, 20, 3.9)} |---------------------------------- (18,{(John,18,4.0F),(Joe,18,3.8F)})
pig udf自定义
pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中
public class MyUDF extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
if(input == null || input.size() ==0)
return null;
try {
String val = (String) input.get(0);
return new StringBuffer(val).append(" pig").toString();
} catch (Exception e) {
throw new IOException(e.getMessage());
}
}
}pig支持udf in loader and storeudf loader 需要继承于LoadFuncudf storer 需要继承于StoreFunc这类似于hadoop中写inputformat和outputformat其中vertica就是写了一个DB版本的
这里贴一个简单的loader的例子:public class MyLoader extends LoadFunc{
protected RecordReader recordReader = null;
private PreparedStatement ps;
private Connection conn;
private final String jdbcURL;
private final String user;
private final String pwd;
private final String querySql;
private ResultSet rs;
public MyLoader(String driver,String jdbcURL,String user,String pwd,String querySql){
try {
Class.forName(driver);
} catch (Exception e) {
// TODO: handle exception
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pwd = pwd;
this.querySql = querySql;
}
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// TODO 重要的读取过程
Text val = null;
boolean next = false;
try {
next = rs.next();
} catch (Exception e) {
// TODO: handle exception
}
if(!next)
return null;
ResultSetMetaData rsmd;
try {
// rsmd = result
} catch (Exception e) {
// TODO: handle exception
}
return null;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {
this.recordReader = arg0;
}
@Override
public void setLocation(String arg0, Job arg1) throws IOException {
//no idea
}
public ResourceSchema getSchema(String location,Job job) throws IOException{
Configuration conf = job.getConfiguration();
Schema schema = new Schema();
try {
//TODO:reader from database table
// Connection conn = DriverManager.getConnection(this.jdbcURL, this.user, this.pwd);
FieldSchema fieldName = new FieldSchema("name", DataType.CHARARRAY);
FieldSchema fieldPosition = new FieldSchema("position", DataType.INTEGER);
schema.add(fieldName);
schema.add(fieldPosition);
} catch (Exception e) {
//TODO log exception
}
return null;
}
public void prepareToRead(){
}
}
其中getNext方法就是如何处理reader读取出的数据
getSchema可以固定读取数据的schema
setLocation可以处理输入的数据源
prepareToRead是读取数据之前,可以在此做标识,等等
{:soso_e179:}
页:
[1]