本帖最后由 levycui 于 2016-9-13 18:01 编辑
问题导读:
1、什么是DataFrames?
2、什么是DataFrames Schema?
3、Spark 1.2 Sql踩过的坑有哪些?
在spark sql 之前,spark团队是以兼容hive为目标而创建了shark,但是由于完全使用hive的东西,导致执行计划无法方便的做优化,并且hive的源码有一些线程安全的问题,所以spark团队不得不放弃的了shark。
spark团队在经历了shark的惨痛教训后,痛下决心,自行实现了名为spark sql 的执行引擎,其仅仅在hive方面hql parser(hive的语法与解析)、Hive Metastore和Hive SerDe,并由Catalyst负责执行计划生成和优化。
Spark Sql 的定位无疑是切合广大开发者,sql语法减少了学习成本,其的优化能力简化了开发者性能调优要求,让开发者不再每句代码都得小心翼翼的。
对hive的支持都移动到了 Hive on Spark中。
个人的观点来说,如果要在spark 上使用sql的方式的话,推荐以spark sql为主,对hive的支持与hive的发展总是有一定差距的,并且 spark 团队可能会调整Hive的支持代码.
spark sql 代码使用入口依然符合spark的习惯 - context的声明:
[mw_shl_code=applescript,true]val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)[/mw_shl_code]
如果要使用hive的支持,入口只需换为 HiveContext,而不使用SQLContext就行.
DataFrames 介绍
DataFrames 官方解释其为一个由多个有含义和名字的列组织数据的分布式集合, 其的表现形式比较类同与关系型数据库的表概念.
一般来说,都是优先使用DataFrames而不直接使用RDD,因为DataFrames会优化你的执行计划,而RDD则是忠实的按照你代码生成执行计划,并且spark sql 中提供很多方法便利地从多种数据源生成DataFrames.
如下即为从json文件中生成DataFrames以及一些DataFrames的基本使用:
[mw_shl_code=applescript,true]val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 创建DataFrames
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// 显示数据内容
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// 打印DataFrames的结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 获取name列的数据
df.select("name").show()
// name
// Michael
// Andy
// Justin
// 获取name和age列的数据,但是age的每个值都+1
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// 获取age大于21的数据
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// 查询age的分组计数情况
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
[/mw_shl_code]
spark sql也支持隐式将RDD转化为DataFrame,只需加入如下代码:
[mw_shl_code=applescript,true]// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._[/mw_shl_code]
DataFrames Schema 介绍
DataFrame 可以以两种不同的方式从RDD转化而来,其实准确来说,转化只有一种方式,只是如何识别RDD,如何对应DataFrame的Schema信息有两种方式。
通过现有的class获取schema信息
例如如下代码:
[mw_shl_code=applescript,true]val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()[/mw_shl_code]
但是case calss 在scala 2.10中字段数量是有限的,只能22个,为了规避这个问题,你可以通过实现scala的Product接口:
通过构建具体的schema信息,比如设定字段名、类型等
例如如下代码:
[mw_shl_code=applescript,true]val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema =
StructType(
"name age".split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)[/mw_shl_code]
Spark Sql Demo
我们的demo是简单的用sql的方式从csv文件中筛选数据。
[mw_shl_code=applescript,true]package Demo
import org.apache.spark._
import org.apache.spark.sql._
import scala.collection.mutable
object SqlDemo {
def main(argment: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local") //本地调试运行
val sc = new SparkContext(conf) // 建立spark操作上下文
val sqlContext = new SQLContext(sc) // 建立spark sql 操作上下文
val rowRDD = sc.textFile("xxxPath\\people.csv") //读取 csv 文件
.map(line => { // 转换成 row 结构
val data = line.split(";") // 切分字段
val list = mutable.ArrayBuffer[Any]()
list.append(data(0)) //填充数据
list.append(data(1).toInt) // 填充数据
Row.fromSeq(list) //创建row
})
val fields = new mutable.ArrayBuffer[StructField]()
fields.append(StructField("name",StringType,true)) //添加 name schema
fields.append(StructField("age",IntegerType,true)) //添加 age schema
val schema = StructType(fields) //创建schema 结构
val rdd = sqlContext.applySchema(rowRDD, schema) //将rdd与schema做匹配
rdd.registerTempTable("people") //注册临时表
sqlContext.sql("select * from people where age >= 18") // 筛选成年人士
.collect()
.foreach(println)
sc.stop()
}
}[/mw_shl_code]
Spark Sql 坑
由于公司是1.2 的spark,所以我遇见的问题不一定在其他版本存在。
- spark sql 的字段居然是大写区分的,囧,一旦大小写对应不上,sql异常,你就只能眼花了
- spark sql 的错误提示信息太奇葩了,一点也不准确,比如你where 条件的字段名不对,绝对不会提示你哪个字段找不到,直接把select 的n多字段列出来,把你看晕死
- 本地测试时一定要使用多核心模式,否则多线程问题你是看不出来的,并且单核心真心慢死人
- udf function 必须是可序列化到分布式环境的,如果出现无法序列化的问题,多半是你不小心使用了类成员等依赖于某个类的变量或方法,在scala中可放在伴生类中
- spark sql StructField 的 DataType 无法转为 DecimalType, 当我们根据 StructField做处理时,DecimalType暂时无处处理
- spark sql 的 schemaRDD 实际列少于定义,比如使用union all 时如果列数不一样,在运行时会报java.lang.ArrayIndexOutOfBoundsException错误,该异常是不是很好看懂呢?
来源:gitbooks
作者:fs7744
|
|