本帖最后由 pig2 于 2017-2-15 16:52 编辑
问题导读
1.为什么会产生spark sql
2.sparkSQL包含哪些内容?
3.spark运行框架是什么?
4.spark sql包含哪些组件?
5.sparkSQL有哪两个分支?
1.为什么会产生spark sql
随着Spark的发展,其中sparkSQL作为Spark生态的一员继续发展,而不再受限于hive,只是兼容hive;而hive on spark是一个hive的发展计划,该计划将spark作为hive的底层引擎之一,也就是说,hive将不再受限于一个引擎,可以采用map-reduce、Tez、spark等引擎。
Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。这一策略导致了两个问题:
第一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;
第二是因为MR是进程级并行,写代码的时候不是很注意线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支(至于为何相关修改没有合并到Hive主线,我也不太清楚)。
此外,除了兼容HQL、加速现有Hive数据的查询分析以外,Spark SQL还支持直接对原生RDD对象进行关系查询。同时,除了HQL以外,Spark SQL还内建了一个精简的SQL parser,以及一套Scala DSL。也就是说,如果只是使用Spark SQL内建的SQL方言或Scala DSL对原生RDD对象进行关系查询,用户在开发Spark应用时完全不需要依赖Hive的任何东西。
总结:
从这里说明spark sql是为了实现hive兼容计划,也就是说hive可以使用spark引擎,也就是说我们通过spark具体来说通过spark sql来操作hive。
2.为什么sparkSQL的性能得到提升
这个简单了解即可,
1.内存列存储(In-Memory Columnar Storage)
2.字节码生成技术
3.scala代码优化
3.sparkSQL组成
sparkSQL1.1总体上由四个模块组成:core、catalyst、hive、hive-Thriftserver:
1.core处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
2.catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
3.hive对hive数据的处理
4.hive-ThriftServer提供CLI和JDBC/ODBC接口
在这四个模块中,catalyst处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是catalyst的一个设计图:
其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:
1.sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;
2. Analyzer,主要完成绑定工作,将不同来源的Unresolved LogicalPlan和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolved LogicalPlan;
3. optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan;
4. Planner将LogicalPlan转换成PhysicalPlan;
5. CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划
这些组件的基本实现方法:
6. 先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。
7. Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的属性而转换成resolved LogicalPlan;
8. optimizer使用Optimization Rules,对resolved LogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimized LogicalPlan;
9. Planner使用Planning Strategies,对optimized LogicalPlan进行转换,转换成可以执行的物理计划。
4.sparkSQL组件之解析
sparkSQL有两个分支,sqlContext和hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);
这里介绍sqlContext的关键的概念和组件。
概念:
o LogicalPlan
组件:
o SqlParser
o Analyzer
o Optimizer
o Planner
LogicalPlan
在sparkSQL的运行架构中,LogicalPlan贯穿了大部分的过程,其中catalyst中的SqlParser、Analyzer、Optimizer都要对LogicalPlan进行操作。LogicalPlan的定义如下:
[mw_shl_code=scala,true]abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
self: Product =>
case class Statistics( sizeInBytes: BigInt )
lazy val statistics: Statistics = {
if (children.size == 0) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") }
Statistics(
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product) }
/**
* Returns the set of attributes that this node takes as * input from its children. */
lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))
/**
* Returns true if this expression and all its children have been resolved to a specific schema
* and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
* can override this (e.g.
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
* should return `false`).
*/
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
/**
* Returns true if all its children of this query plan have been resolved.
*/
def childrenResolved: Boolean = !children.exists(!_.resolved)
/**
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child * nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/ def resolveChildren(name: String): Option[NamedExpression] = resolve(name, children.flatMap(_.output))
/**
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/ def resolve(name: String): Option[NamedExpression] = resolve(name, output)
/**
Performs attribute resolution given a name and a sequence of possible attributes.
*
/ protected def resolve(name: String, input: Seq[Attribute]):
Option[NamedExpression] = {
val parts = name.split("\\.")
val options = input.flatMap { option =>
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1)
else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil } options.distinct match {
case Seq((a, Nil)) => Some(a)
// One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) => a.dataType match { case StructType(fields) => Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)()) case _ => None
// Don't know how to resolve these field references } case Seq() => None
// No matches.
case ambiguousReferences => throw new TreeNodeException( this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
}
}[/mw_shl_code]
在LogicalPlan里维护者一套统计数据和属性数据,也提供了解析方法。同时延伸了三种类型的LogicalPlan:
1 LeafNode:对应于trees.LeafNode的LogicalPlan
2 UnaryNode:对应于trees.UnaryNode的LogicalPlan
3 BinaryNode:对应于trees.BinaryNode的LogicalPlan
而对于SQL语句解析时,会调用和SQL匹配的操作方法来进行解析;这些操作分四大类,最终生成LeafNode、UnaryNode、BinaryNode中的一种:
1 basicOperators:一些数据基本操作,如Ioin、Union、Filter、Project、Sort
2 commands:一些命令操作,如SetCommand、CacheCommand
3 partitioning:一些分区操作,如RedistributeData
4 ScriptTransformation:对脚本的处理,如ScriptTransformation
5 LogicalPlan类的总体架构如下所示
SqlParser
SqlParser的功能就是将SQL语句解析成Unresolved LogicalPlan。现阶段的SqlParser语法解析功能比较简单,支持的语法比较有限。其解析过程中有两个关键组件和一个关键函数:
1词法读入器SqlLexical,其作用就是将输入的SQL语句进行扫描、去空、去注释、校验、分词等动作。
2 SQL语法表达式query,其作用定义SQL语法表达式,同时也定义了SQL语法表达式的具体实现,即将不同的表达式生成不同sparkSQL的Unresolved LogicalPlan。
3 函数phrase(),上面个两个组件通过调用phrase(query)(new lexical.Scanner(input)),完成对SQL语句的解析;在解析过程中,SqlLexical一边读入,一边解析,如果碰上生成符合SQL语法的表达式时,就调用相应SQL语法表达式的具体实现函数,将SQL语句解析成Unresolved LogicalPlan。
下面看看sparkSQL的整个解析过程和相关组件:
解析过程
首先,在sqlContext中使用下面代码调用catalyst.SqlParser:
[mw_shl_code=scala,true]/*源自 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)[/mw_shl_code]
然后,直接在SqlParser的apply方法中对输入的SQL语句进行解析,解析功能的核心代码就是:
phrase(query)(new lexical.Scanner(input))
[mw_shl_code=scala,true]/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */
class SqlParser extends StandardTokenParsers with PackratParsers {
def apply(input: String): LogicalPlan = {
if (input.trim.toLowerCase.startsWith("set")) {
//set设置项的处理
...... }
else {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
} } }
......[/mw_shl_code]
可以看得出来,该语句就是调用phrase()函数,使用SQL语法表达式query,对词法读入器lexical读入的SQL语句进行解析,其中词法读入器lexical通过重写语句:override val lexical = new SqlLexical(reservedWords) 调用扩展了功能的SqlLexical。其定义:
[mw_shl_code=scala,true]/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */
// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
this.getClass .
getMethods
.filter(_.getReturnType == classOf[Keyword])
.map(_.invoke(this).asInstanceOf[Keyword].str)
override val lexical = new SqlLexical(reservedWords)[/mw_shl_code]
为了加深对SQL语句解析过程的理解,让我们看看下面这个简单数字表达式解析过程来说明:
[mw_shl_code=scala,true]import scala.util.parsing.combinator.PackratParsers
import scala.util.parsing.combinator.syntactical._
object mylexical extends StandardTokenParsers with PackratParsers {
//定义分割符 lexical.delimiters ++= List(".", ";", "+", "-", "*")
//定义表达式,支持加,减,乘
lazy val expr: PackratParser[Int] = plus | minus | multi
//加法表示式的实现
lazy val plus: PackratParser[Int] = num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt}
//减法表达式的实现 lazy val minus: PackratParser[Int] = num ~ "-" ~ num ^^ { case n1 ~ "-" ~ n2 => n1.toInt - n2.toInt}
//乘法表达式的实现 lazy val multi: PackratParser[Int] = num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt} lazy val num = numericLit def parse(input: String) = {
//定义词法读入器myread,并将扫描头放置在input的首位
val myread = new PackratReader(new lexical.Scanner(input))
print("处理表达式 " + input)
phrase(expr)(myread)
match { case Success(result, _) => println(" Success!");
println(result); Some(result) case n => println(n);
println("Err!"); None
}
}
def main(args: Array[String]) {
val prg = "6 * 3" :: "24-/*aaa*/4" :: "a+5" :: "21/3" :: Nil
prg.map(parse)
}
}
[/mw_shl_code]运行结果:
[mw_shl_code=bash,true]处理表达式 6 * 3 Success! //lexical对空格进行了处理,得到6*3
18 //6*3符合乘法表达式,调用n1.toInt * n2.toInt,得到结果并返回
处理表达式 24-/*aaa*/4 Success!
//lexical对注释进行了处理,得到20-4 20
//20-4符合减法表达式,调用n1.toInt - n2.toInt,得到结果并返回
处理表达式 a+5[1.1] failure: number expected
//lexical在解析到a,发现不是整数型,故报错误位置和内容
a+5
^
Err!
处理表达式 21/3[1.3] failure: ``*'' expected but ErrorToken(illegal character)
found
//lexical在解析到/,发现不是分割符,故报错误位置和内容
21/3
^
Err![/mw_shl_code]
在运行的时候,首先对表达式 6 * 3 进行解析,词法读入器myread将扫描头置于6的位置;当phrase()函数使用定义好的数字表达式expr处理6 * 3的时候,6 * 3每读入一个词法,就和expr进行匹配,如读入6*和expr进行匹配,先匹配表达式plus,*和+匹配不上;就继续匹配表达式minus,*和-匹配不上;就继续匹配表达式multi,这次匹配上了,等读入3的时候,因为3是num类型,就调用调用n1.toInt * n2.toInt进行计算。
注意,这里的expr、plus、minus、multi、num都是表达式,|、~、^^是复合因子,表达式和复合因子可以组成一个新的表达式,如plus(num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt})就是一个由num、+、num、函数构成的复合表达式;而expr(plus | minus | multi)是由plus、minus、multi构成的复合表达式;复合因子的含义定义在类scala/util/parsing/combinator/Parsers.scala,下面是几个常用的复合因子:
1 p | q p失败则q,返回第一个成功的结果
2 p ^^ f 如果p成功,将函数f应用到p的结果上
3 p ^? f 如果p成功,如果函数f可以应用到p的结果上的话,就将p的结果用f进行转换
针对上面的6 * 3使用的是multi表达式(num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt}),其含义就是:num后跟*再跟num,如果满足就将使用函数n1.toInt * n2.toInt。
到这里为止,大家应该明白整个解析过程了吧,。SqlParser的原理和这个表达式解析器使用了一样的原理,只不过是定义的SQL语法表达式query复杂一些,使用的词法读入器更丰富一些而已。下面分别介绍一下相关组件SqlParser、SqlLexical、query。
SqlParser
首先,看看SqlParser的UML图:
其次,看看SqlParser的定义,SqlParser继承自类StandardTokenParsers和特质PackratParsers:
其中,PackratParsers:
1 扩展了scala.util.parsing.combinator.Parsers所提供的parser,做了内存化处理;
2 Packrat解析器实现了回溯解析和递归下降解析,具有无限先行和线性分析时的优势。同时,也支持左递归词法解析。
3 从Parsers中继承出来的class或trait都可以使用PackratParsers,如:object MyGrammar extends StandardTokenParsers with PackratParsers;
4 PackratParsers将分析结果进行缓存,因此,PackratsParsers需要PackratReader(内存化处理的Reader)作为输入,程序员可以手工创建PackratReader,如production(new PackratReader(new lexical.Scanner(input))),更多的细节参见scala库中/scala/util/parsing/combinator/PackratParsers.scala文件。
StandardTokenParsers是最终继承自Parsers
5 增加了词法的处理能力(Parsers是字符处理),在StdTokenParsers中定义了四种基本词法:
o keyword tokens
o numeric literal tokens
o string literal tokens
o identifier tokens
6 定义了一个词法读入器lexical,可以进行词法读入
SqlParser在进行解析SQL语句的时候是调用了PackratParsers中phrase():
[mw_shl_code=scala,true]/*源自 scala/util/parsing/combinator/PackratParsers.scala */
/**
* A parser generator delimiting whole phrases (i.e. programs).
*
* Overridden to make sure any input passed to the argument parser
* is wrapped in a `PackratReader`.
*/
override def phrase[T](p: Parser[T]) = {
val q = super.phrase(p)
new PackratParser[T] {
def apply(in: Input) = in match {
case in: PackratReader[_] => q(in)
case in => q(new PackratReader(in))
}
}
}[/mw_shl_code]
在解析过程中,一般会定义多个表达式,如上面例子中的plus | minus | multi,一旦前一个表达式不能解析的话,就会调用下一个表达式进行解析:
[mw_shl_code=scala,true]/*源自 scala/util/parsing/combinator/Parsers.scala */
def append[U >: T](p0: => Parser[U]): Parser[U] = { lazy val p = p0
// lazy argument
Parser{ in => this(in) append p(in)
}
}[/mw_shl_code]
表达式解析正确后,具体的实现函数是在PackratParsers中完成:
[mw_shl_code=scala,true]/*源自 scala/util/parsing/combinator/PackratParsers.scala */
def memo[T](p: super.Parser[T]): PackratParser[T] = {
new PackratParser[T] {
def apply(in: Input) = {
val inMem = in.asInstanceOf[PackratReader[Elem]]
//look in the global cache if in a recursion
val m = recall(p, inMem) m match {
//nothing has been done due to recall
case None =>
val base = LR(Failure("Base Failure",in), p, None)
inMem.lrStack = base::inMem.lrStack
//cache base result
inMem.updateCacheAndGet(p,MemoEntry(Left(base)))
//parse the input
val tempRes = p(in)
//the base variable has passed equality tests with the cache
inMem.lrStack = inMem.lrStack.tail
//check whether base has changed, if yes, we will have a
head base.head match {
case None => /*simple result*/ inMem.updateCacheAndGet(p,MemoEntry(Right(tempRes))) tempRes
case s@Some(_) =>
/*non simple result*/
base.seed = tempRes
//the base variable has passed equality tests with the cache
val res = lrAnswer(p, inMem, base)
res
}
case Some(mEntry) => {
//entry found in cache
mEntry match {
case MemoEntry(Left(recDetect)) => {
setupLR(p, inMem, recDetect)
//all setupLR does is change the heads of the recursions, so the seed will stay the same
recDetect match {case LR(seed, _, _) =>
seed.asInstanceOf[ParseResult[T]]} }
case MemoEntry(Right(res: ParseResult[_])) =>
res.asInstanceOf[ParseResult[T]]
}
}
}
}
}
}
[/mw_shl_code]
StandardTokenParsers增加了词法处理能力,SqlParers定义了大量的关键字,重写了词法读入器,将这些关键字应用于词法读入器。
SqlLexical
词法读入器SqlLexical扩展了StdLexical的功能,首先增加了大量的关键字:
[mw_shl_code=scala,true]/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
......
protected val SUBSTR = Keyword("SUBSTR")
protected val SUBSTRING = Keyword("SUBSTRING")[/mw_shl_code]
其次丰富了分隔符、词法处理、空格注释处理:
最后看看SQL语法表达式query。
query
SQL语法表达式支持3种操作:select、insert、cache
而这些操作还有具体的定义,如select,这里开始定义了具体的函数,将SQL语句转换成构成Unresolved LogicalPlan的一些Node:
Analyzer
Analyzer的功能就是对来自SqlParser的Unresolved LogicalPlan中的UnresolvedAttribute项和UnresolvedRelation项,对照catalog和FunctionRegistry生成Analyzed LogicalPlan。Analyzer定义了5大类14小类的rule:
[mw_shl_code=scala,true]/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala */
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once, NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once, (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
ResolveSortReferences ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
typeCoercionRules :_*),
Batch("Check Analysis", Once, CheckResolution),
Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) )[/mw_shl_code]
MultiInstanceRelations
o NewRelationInstances
1 CaseInsensitiveAttributeReferences
o LowercaseAttributeReferences
2 Resolution
o ResolveReferences
o ResolveRelations
o ResolveSortReferences
o NewRelationInstances
o ImplicitGenerate
o StarExpansion
o ResolveFunctions
o GlobalAggregates
o UnresolvedHavingClauseAttributes
o typeCoercionRules
3Check Analysis
o CheckResolution
4 AnalysisOperators
o EliminateAnalysisOperators
这些rule都是使用transform对Unresolved LogicalPlan进行操作,其中typeCoercionRules是对HiveQL语义进行处理,在其下面又定义了多个rule:PropagateTypes、ConvertNaNs、WidenTypes、PromoteStrings、BooleanComparisons、BooleanCasts、StringToIntegralCasts、FunctionArgumentConversion、CaseWhenCoercion、Division,同样了这些rule也是使用transform对Unresolved LogicalPlan进行操作。这些rule操作后,使得LogicalPlan的信息变得丰满和易懂。下面拿其中的两个rule来简单介绍一下:
比如rule之ResolveReferences,最终调用LogicalPlan的resolveChildren对列名给一名字和序号,如name#67之列的,这样保持列的唯一性:
又比如rule之StarExpansion,其作用就是将Select * Fom tbl中的*展开,赋予列名:
Optimizer
Optimizer的功能就是将来自Analyzer的Analyzed LogicalPlan进行多种rule优化,生成Optimized LogicalPlan。Optimizer定义了3大类12个小类的优化rule:
[mw_shl_code=scala,true]/*源自
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Combine Limits", FixedPoint(100), CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, LikeSimplification, BooleanSimplification, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning) :: Nil
}[/mw_shl_code]
1Combine Limits 合并Limit
o CombineLimits:将两个相邻的limit合为一个
2ConstantFolding 常量叠加
o NullPropagation 空格处理
o ConstantFolding:常量叠加
o LikeSimplification:like表达式简化
o BooleanSimplification:布尔表达式简化
o SimplifyFilters:Filter简化
o SimplifyCasts:Cast简化
o SimplifyCaseConversionExpressions:CASE大小写转化表达式简化
3 Filter Pushdown Filter下推
o CombineFilters Filter合并
o PushPredicateThroughProject 通过Project谓词下推
o PushPredicateThroughJoin 通过Join谓词下推
o ColumnPruning 列剪枝
这些优化rule都是使用transform对LogicalPlan进行操作,如合并、删除冗余、简化、剪枝等,是整个LogicalPlan变得更简洁更高效。
比如将两个相邻的limit进行合并,可以使用CombineLimits。象sql("select * from (select * from src limit 5)a limit 3 ") 这样一个SQL语句,会将limit 5和limit 3进行合并,只剩一个一个limit 3。
[mw_shl_code=scala,true]
/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
Limit(If(LessThan(ne, le), ne, le), grandChild)
}
}[/mw_shl_code]
又比如Null值的处理,可以使用NullPropagation处理。象sql("select count(null) from src where key is not null")这样一个SQL语句会转换成sql("select count(0) from src where key is not null")来处理。
[mw_shl_code=scala,true]/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */
object NullPropagation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsUp {
case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType)
case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)
case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType) case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
case e @ EqualNullSafe(Literal(null, _), r) => IsNull(r)
case e @ EqualNullSafe(l, Literal(null, _)) => IsNull(l) ......
}
}
}[/mw_shl_code]
对于具体的优化方法可以使用下一章所介绍的hive/console调试方法进行调试,用户可以使用自定义的优化函数,也可以使用sparkSQL提供的优化函数。使用前先定义一个要优化查询,然后查看一下该查询的Analyzed LogicalPlan,再使用优化函数去优化,将生成的Optimized LogicalPlan和Analyzed LogicalPlan进行比较,就可以看到优化的效果。
5.spark sql运行架构
对于sql语句,spark与传统数据库解析是差不多的。将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。
如下面是传统数据库sql语句对应的tree
sparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。
Tree介绍
Tree的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
1. Logical Plans、Expressions、Physical Operators都可以使用Tree表示
2. Tree的具体操作是通过TreeNode来实现的
o sparkSQL定义了catalyst.trees的日志,通过这个日志可以形象的表示出树的结构
o TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)进行操作
o 有了TreeNode,通过Tree中各个TreeNode之间的关系,可以对Tree进行遍历操作,如使用transformDown、transformUp将Rule应用到给定的树段,然后用结果替代旧的树段;也可以使用transformChildrenDown、transformChildrenUp对一个给定的节点进行操作,通过迭代将Rule应用到该节点以及子节点。
3.TreeNode可以细分成三种类型的Node:
o UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作
o BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作
o LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand
Rule介绍
1. Rule的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
2. Rule在sparkSQL的Analyzer、Optimizer、SparkPlan等各个组件中都有应用到
3. Rule是一个抽象类,具体的Rule实现是通过RuleExecutor完成
4. Rule通过定义batch和batchs,可以简便的、模块化地对Tree进行transform操作
5. Rule通过定义Once和FixedPoint,可以对Tree进行一次操作或多次操作(如对某些Tree进行多次迭代操作的时候,达到FixedPoint次数迭代或达到前后两次的树结构没变化才停止操作,具体参看RuleExecutor.apply)
拿个简单的例子,在处理由解析器(SqlParse)生成的LogicPlan Tree的时候,在Analyzer中就定义了多种Rules应用到LogicPlan Tree上。
应用示意图:
Analyzer中使用的Rules,定义了batches,由多个batch构成,如MultiInstanceRelations、Resolution、Check Analysis、AnalysisOperators等构成;每个batch又有不同的rule构成,如Resolution由ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等构成;每个rule又有自己相对应的处理函数,可以具体参看Analyzer中的ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances函数;同时要注意的是,不同的rule应用次数是不同的:如CaseInsensitiveAttributeReferences这个batch中rule只应用了一次(Once),而Resolution这个batch中的rule应用了多次(fixedPoint = FixedPoint(100),也就是说最多应用100次,除非前后迭代结果一致退出)。
在整个sql语句的处理过程中,Tree和Rule相互配合,完成了解析、绑定(在sparkSQL中称为Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。
知道了sparkSQL的各个过程的基本处理方式,下面来看看sparkSQL的运行过程。sparkSQL有两个分支,sqlContext和hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);hiveContext现在支持sql语法解析器和hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器,来运行hiveql不支持的语法,如select 1。关于sqlContext和hiveContext的具体看下面
SQLContext成员:
Catalog
一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类。
SqlParser
Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan
Analyzer
logical plan的语法分析器
Optimizer
logical Plan的优化器
LogicalPlan
逻辑计划,由catalyst的TreeNode组成,可以看到有3种语法树
SparkPlanner
包含不同策略的优化策略来优化物理执行计划
QueryExecution
sql执行的环境上下文
就是这些对象组成了Spark SQL的运行时,看起来很酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行运行时。
那这些对象是怎么相互协作来执行sql语句的呢?
sqlContext的运行过程
sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:
话不多说,先上图,这个图我用一个在线作图工具process on话的:
核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。
先概括一下,大致的执行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD
更具体的执行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD
详细分析:
sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:
[mw_shl_code=scala,true]/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText)) //parseSql(sqlText)对sql语句进行语法解析
}
else
{
sys.error(s"Unsupported SQL dialect: $dialect")
}
}[/mw_shl_code]
sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成Unresolved LogicalPlan。
[mw_shl_code=scala,true]/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)[/mw_shl_code]
类SchemaRDD继承自SchemaRDDLike
[mw_shl_code=scala,true]/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala */
class SchemaRDD( @transient val sqlContext: SQLContext, @transient val baseLogicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike[/mw_shl_code]
SchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成Unresolved LogicalPlan,这里的baseLogicalPlan就是指Unresolved LogicalPlan。
[mw_shl_code=scala,true]/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala */private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)[/mw_shl_code]
sqlContext.executePlan做了什么呢?它调用了QueryExecution类
[mw_shl_code=scala,true]/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }[/mw_shl_code]
QueryExecution类的定义:
[mw_shl_code=scala,true]/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */protected abstract class QueryExecution {
def logical: LogicalPlan
//对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
//对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
lazy val optimizedPlan = optimizer(analyzed) // 将optimized LogicalPlan转换成PhysicalPlan
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
// PhysicalPlan执行前的准备工作,生成可执行的物理计划
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
//执行可执行物理计划
lazy val toRdd: RDD[Row] = executedPlan.execute()
......
}[/mw_shl_code]
sqlContext总的一个过程如下图所示:
1. SQL语句经过SqlParse解析成Unresolved LogicalPlan;
2. 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolved LogicalPlan;
3. 使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan;
4. 使用SparkPlan将LogicalPlan转换成PhysicalPlan;
5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
6. 使用execute()执行可执行物理计划;
7. 生成SchemaRDD。
总结:
通过分析SQLContext我们知道了Spark SQL都包含了哪些组件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含Physical Plan),QueryExecution.
通过调试代码,知道了Spark SQL的执行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD
hiveContext的运行过程
在分布式系统中,由于历史原因,很多数据已经定义了hive的元数据,通过这些hive元数据,sparkSQL使用hiveContext很容易实现对这些数据的访问。值得注意的是hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函数和变量,可以使用和sqlContext一样的函数和变量。
从sparkSQL1.1开始,hiveContext使用hiveContext.sql(sqlText)来提交用户sql语句进行查询:
[mw_shl_code=bash,true]/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
override def sql(sqlText: String): SchemaRDD = {
// 使用spark.sql.dialect定义采用的语法解析器
if (dialect == "sql") { super.sql(sqlText)
//如果使用sql解析器,则使用sqlContext的sql方法
}
else if (dialect == "hiveql") {
//如果使用和hiveql解析器,则使用HiveQl.parseSql
new SchemaRDD(this, HiveQl.parseSql(sqlText))
}
else {
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
}
}[/mw_shl_code]
hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect == "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。
[mw_shl_code=scala,true]/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan =
{ try
{
if (条件) {
//非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
.....
}
else
{
val tree = getAst(sql)
if (nativeCommands contains tree.getText)
{
NativeCommand(sql)
}
else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other } } } }
catch { //异常处理
...... }
}[/mw_shl_code]
因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:
1 首先考虑一些非hive语句的处理,这些命令属于sparkSQL本身的命令语句,如设置sparkSQL运行参数的set命令、cache table、add jar等,将这些语句转换成command类型的LogicalPlan;
2如果是hive语句,则调用getAst(sql)使用hive的ParseUtils将该语句先解析成AST树,然后根据AST树中的关键字进行转换:类似命令型的语句、DDL类型的语句转换成command类型的LogicalPlan;其他的转换通过nodeToPlan转换成LogicalPlan。
[mw_shl_code=scala,true]/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */
/** * Returns the AST for the given SQL string. */
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))[/mw_shl_code]
和sqlContext一样,类SchemaRDD继承自SchemaRDDLike ,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数:
[mw_shl_code=scala,true]/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }[/mw_shl_code]
并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:
[mw_shl_code=applescript,true]/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan = optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
......
}[/mw_shl_code]
所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。
hiveContext的catalog,是指向 Hive Metastore:
[mw_shl_code=scala,true]/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
override def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan = {
LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}
}[/mw_shl_code]
hiveContext的analyzer,使用了新的catalog和functionRegistry:
[mw_shl_code=scala,true]/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = false)[/mw_shl_code]
hiveContext的planner,使用新定义的hivePlanner:
[mw_shl_code=scala,true]/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
@transient override protected[sql] val planner =[/mw_shl_code]
所以hiveContext总的一个过程如下图所示:
1. SQL语句经过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;
2. 使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan;
3. 使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;
4. 使用hivePlanner将LogicalPlan转换成PhysicalPlan;
5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
6. 使用execute()执行可执行物理计划;
7. 执行后,使用map(_.copy)将结果导入SchemaRDD。
hiveContxt还有很多针对hive的特性,更细节的内容参看源码。
相关内容
sparkSQL1.1入门线路指导
http://www.aboutyun.com/forum.php?mod=viewthread&tid=9958
|
|