breaking 发表于 2016-3-31 14:00:46

Spark SQL UDF使用,sparksqludf使用

本帖最后由 breaking 于 2016-3-31 14:56 编辑


问题导读:

1.Spark Sql UDF怎么使用?
2.案例分析使用过程?


static/image/hrline/4.gif

Spark1.1推出了Uer Define Function功能,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。
因为目前Spark SQL本身支持的函数有限,一些常用的函数都没有,比如len, concat...etc 但是使用UDF来自己实现根据业务需要的功能是非常方便的。   Spark SQL UDF其实是一个Scala函数,被catalyst封装成一个Expression结点,最后通过eval方法计根据当前Row计算UDF的结果,源码分析见:Spark SQL源码分析之UDF   Spark SQL UDF使用起来非常方便,分2个步骤:   
一、注册   当我们导入了SQLContext或者HiveContext,即有注册UDF的功能。   registerFunction(udfName : String, func : FunctionN)    由于scala语言的限制,这里UDF的参数仅支持22个。
二、使用    select udfName(param1, param2....) from tableName
三、示例我们这里创建2张表: 第一张dual会从README.md读取记录,里面仅有一个字段line : String 第二张表src,有2个字段key,value,数据是spark sql自带的测试数据。
我们使用 sbt/sbt hive/console进入测试环境:
1、字符串取长度 len()

创建table dual:
scala> sql("create table dual(line string)").collect()
14/09/19 17:41:34 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:dual, dbName:default, owner:root, createTime:1411119694, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:, location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))
14/09/19 17:41:34 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr      cmd=create_table: Table(tableName:dual, dbName:default, owner:root, createTime:1411119694, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:, location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))
载入README.md数据:

sql("load data local inpath 'README.md' into table dual ").collect()

scala> sql("select * from dual").collect()
res4: Array = Array([# Apache Spark], [], , , , , , , [], [<http://spark.apache.org/>], [], [], [## Online Documentation], [], , http://spark.apache.org/documentation.html>.], , [], [## Building Spark], [], ...

编写len函数并,注册函数:
scala> registerFunction("len",(x:String)=>x.length)

测试:

scala> sql("select len(line) from dual").collect()14/09/19 17:45:07 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:85, took 0.072239295 sres6: Array = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ...
2、字符串连接concat_str

这里为了简单起见,就根据src表的key value类型 Int, String来做例子:

scala> sql("desc src").collect()res8: Array = Array(, )scala> sql("select * from src limit 10").collect()res7: Array = Array(, , , , , , , , , )
编写并注册concat_str函数:

scala> registerFunction("concat_str",(a:Int, b:String)=>a.toString+b)

测试concat函数

scala> sql("select concat_str(key,value) from src ").collect()

14/09/19 18:17:22 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:85, took 0.082076377 s
res28: Array = Array(, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , [207val_...
scala>


原文链接:http://www.bkjia.com/yjs/881134.html

BIGSPARK 发表于 2016-4-1 14:22:34

这个自定义函数是只有spark1.1支持吗,我的spark是1.3.1,在输入registerFunction时报error,请问是怎么回事

a530491093 发表于 2016-4-5 16:46:15

good!!!!
页: [1]
查看完整版本: Spark SQL UDF使用,sparksqludf使用