xioaxu790 发表于 2014-7-30 19:22:17

整理对Spark SQL的理解

问题导读
1、Spark支持哪些特性?
2、Spark和Hadoop的对比是什么?
3、如何理解Spark的逻辑执行计划?

static/image/hrline/4.gif


Catalyst
Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架。
目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail。

以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程。



Catalyst定位
其他系统如果想基于Spark做一些类sql、标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器、执行计划树结构、逻辑执行计划的处理规则体系等类体系来实现执行计划的解析、生成、优化、映射工作。
对应上图中,主要是左侧的TreeNodelib及中间三次转化过程中涉及到的类结构都是Catalyst提供的。至于右侧物理执行计划映射生成过程,物理执行计划基于成本的优化模型,具体物理算子的执行都由系统自己实现。

Catalyst现状
在解析器方面提供的是一个简单的scala写的sql parser,支持语义有限,而且应该是标准sql的。
在规则方面,提供的优化规则是比较基础的(和Pig/Hive比没有那么丰富),不过一些优化规则其实是要涉及到具体物理算子的,所以部分规则需要在系统方那自己制定和实现(如spark-sql里的SparkStrategy)。
Catalyst也有自己的一套数据类型。

下面介绍Catalyst里几套重要的类结构。

TreeNode体系
TreeNode是Catalyst执行计划表示的数据结构,是一个树结构,具备一些scala collection的操作能力和树遍历能力。这棵树一直在内存里维护,不会dump到磁盘以某种格式的文件存在,且无论在映射逻辑执行计划阶段还是优化逻辑执行计划阶段,树的修改是以替换已有节点的方式进行的。

TreeNode,内部带一个children: Seq表示孩子节点,具备foreach、map、collect等针对节点操作的方法,以及transformDown(默认,前序遍历)、transformUp这样的遍历树上节点,对匹配节点实施变化的方法。
提供UnaryNode,BinaryNode, LeafNode三种trait,即非叶子节点允许有一个或两个子节点。
TreeNode提供的是范型。
TreeNode有两个子类继承体系,QueryPlan和Expression。QueryPlan下面是逻辑和物理执行计划两个体系,前者在Catalyst里有详细实现,后者需要在系统自己实现。Expression是表达式体系,后面章节都会展开介绍。


Tree的transformation实现:
传入PartialFunction,如果与操作符匹配,则节点会被结果替换掉,否则节点不会变动。整个过程是对children递归执行的。

执行计划表示模型
逻辑执行计划
QueryPlan继承自TreeNode,内部带一个output: Seq,具备transformExpressionDown、transformExpressionUp方法。
在Catalyst中,QueryPlan的主要子类体系是LogicalPlan,即逻辑执行计划表示。其物理执行计划表示由使用方实现(spark-sql项目中)。
LogicalPlan继承自QueryPlan,内部带一个reference:Set,主要方法为resolve(name:String): Option,用于分析生成对应的NamedExpression。
LogicalPlan有许多具体子类,也分为UnaryNode, BinaryNode, LeafNode三类,具体在org.apache.spark.sql.catalyst.plans.logical路径下。




逻辑执行计划实现
LeafNode主要子类是Command体系:


各command的语义可以从子类名字看出,代表的是系统可以执行的non-query命令,如DDL。

UnaryNode的子类:



BinaryNode的子类:



物理执行计划
另一方面,物理执行计划节点在具体系统里实现,比如spark-sql工程里的SparkPlan继承体系。



物理执行计划实现
每个子类都要实现execute()方法,大致有以下实现子类(不全)。
LeadNode的子类:



UnaryNode的子类:



BinaryNode的子类:


提到物理执行计划,还要提一下Catalyst提供的分区表示模型。

执行计划映射
Catalyst还提供了一个QueryPlanner]抽象类,需要子类制定一批strategies: Seq,其apply方法也是类似根据制定的具体策略来把逻辑执行计划算子映射成物理执行计划算子。由于物理执行计划的节点是在具体系统里实现的,所以QueryPlanner及里面的strategies也需要在具体系统里实现。



在spark-sql项目中,SparkStrategies继承了QueryPlanner,内部制定了LeftSemiJoin, HashJoin,PartialAggregation, BroadcastNestedLoopJoin, CartesianProduct等几种策略,每种策略接受的都是一个LogicalPlan,生成的是Seq,每个SparkPlan理解为具体RDD的算子操作。
比如在BasicOperators这个Strategy里,以match-case匹配的方式处理了很多基本算子(可以一对一直接映射成RDD算子),如下:
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil


Expression体系
Expression,即表达式,指不需要执行引擎计算,而可以直接计算或处理的节点,包括Cast操作,Projection操作,四则运算,逻辑操作符运算等。
具体可以参考org.apache.spark.sql.expressionspackage下的类。

Rules体系
凡是需要处理执行计划树(Analyze过程,Optimize过程,SparkStrategy过程),实施规则匹配和节点处理的,都需要继承RuleExecutor抽象类。
RuleExecutor内部提供了一个Seq,里面定义的是该RuleExecutor的处理步骤。每个Batch代表着一套规则,配备一个策略,该策略说明了迭代次数(一次还是多次)。
protected case class Batch(name: String, strategy: Strategy, rules: Rule*)
Rule]是一个抽象类,子类需要复写apply(plan: TreeType)方法来制定处理逻辑。
RuleExecutor的apply(plan: TreeType): TreeType方法会按照batches顺序和batch内的Rules顺序,对传入的plan里的节点迭代处理,处理逻辑为由具体Rule子类实现。


Hive相关
Hive支持方式
Spark SQL对hive的支持是单独的spark-hive项目,对Hive的支持包括HQL查询、hive metaStore信息、hive SerDes、hive UDFs/UDAFs/ UDTFs,类似Shark。
只有在HiveContext下通过hive api获得的数据集,才可以使用hql进行查询,其hql的解析依赖的是org.apache.hadoop.hive.ql.parse.ParseDriver类的parse方法,生成Hive AST。

实际上sql和hql,并不是一起支持的。可以理解为hql是独立支持的,能被hql查询的数据集必须读取自hive api。下图中的parquet、json等其他文件支持只发生在sql环境下(SQLContext)。



Hive on Spark
Hive官方提出了Hive onSpark的JIRA。Shark结束之后,拆分为两个方向:



Spark SQL里现在对Hive的支持,体现在复用了Hive的meta store数据、hql解析、UDFs、SerDes,在执行DDL和某些简单命令的时候,调的是hive客户端。hql翻译前会处理一些与query主体无关的set, cache, addfile等命令,然后调用ParserDriver翻译hql,并把AST转换成Catalyst的LogicalPlan,后续优化、物理执行计划翻译及执行过程,与Sql一样使用的是Catalyst提供的内容,执行引擎是Spark。在整个结合过程中,ASTNode映射成LogicalPlan是重点。


而Hive社区的Hive on Spark会怎样实现,具体参考jira里的设计文档。

与Shark对比,


Shark多依赖了Hive的执行计划相关模块以及CLI。CLI和JDBC部分是Spark SQL后续打算支持的。Shark额外提供的对Table数据行转列、序列化、压缩存内存的模块,也被拿到了Spark Sql的sql工程里。

以上说明了Shark与Spark SQL Hive的区别,对Shark这个项目继承性的理解。
而Spark SQL Hive与Hive社区 Hive on Spark的区别需要具体参考jira里的设计文档,我也还没有读过。

spark-hive工程



解析过程
HiveQl.parseSql()把hql解析成logicalPlan。解析过程,提取出一些command,包括:
2set key=value
2cache table
2uncache table
2add jar
2add file
2dfs
2source
然后由Hive的ParseDriver把hql解析成AST,得到ASTNode,
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))

把Node转化为Catalyst的LogicalPlan,转化逻辑较复杂,也是Sparksql对hql支持的最关键部分。详见HiveQl.nodeToPlan(node: Node):LogicalPlan方法。
大致转换逻辑包括:
处理TOK_EXPLAIN和TOK_DESCTABLE
处理TOK_CREATETABLE,包括创建表时候一系列表的设置TOK_XXX
处理TOK_QUERY,包括TOK_SELECT,TOK_WHERE,TOK_GROUPBY,TOK_HAVING,TOK_SORTEDBY,TOK_LIMIT等等,对FROM后面跟的语句进行nodeToRelation处理。
处理TOK_UNION
对Hive AST树结构和表示不熟悉,所以此处略过。

Analyze过程
metadata交互
Catalog类为HiveMetastoreCatalog,通过hive的conf生成client(org.apache.hadoop.hive.ql.metadata.Hive,用于与MetaStore通信,获得metadata以及进行DDL操作),catalog的lookupRelation方法里面,client.getTable()得到表信息,client.getAddPartitionsForPruner()得到分区信息。

udf相关
FunctionRegistry类为HiveFunctionRegistry,根据方法名,通过hive的相关类去查询该方法,检查是否具有该方法,是UDF,还是UDAF(aggregation),或是UDTF(table)。这里只做已有udf的查询,不做新方法的include。
与Catalyst的Expression对应继承关系如下:



Inspector相关
HiveInspectors提供了几个映射数据类型和ObjectInspetor子类的方法,包括PrimitiveObjectInspector,ListObjectInspector,MapObjectInspector,StructObjectInspector四种

Optimizer过程
在做优化前,会尝试对之前生成的逻辑执行计划进行createtabl操作,因为执行的hql可能是“CREATE TABLE XXX”,这部分处理在HiveMetastoreCatalog的CreateTables单例里,继承了Rule。
以及PreInsertionCasts处理,也是HiveMetastoreCatalog里的单例,继承了Rule。

之后的optimizer过程同SQLContext里,用的是同一个Catalyst提供的Optimizer类。

Planner及执行过程
HiveContext继承自SQLContext,其QueryExecution也继承自SQLContext的QueryExecution。后续执行计划优化、物理执行计划翻译、处理及执行过程同SQL的处理逻辑是一致的。

翻译物理执行计划的时候,hive planner里制定了些特定的策略,与SparkPlanner稍有不同。



多了Scripts,DataSinks,HiveTableScans和HiveCommandStrategy四种处理物理执行计划的策略(见HiveStrategies)。
1.Scripts,用于处理那种hive命令行执行脚本的情况。实现方式是使用ProcessBuilder新起一个JVM进程的方式,用”/bin/bash –c scripts”的方式执行脚本并获取输出流数据,转化为Catalyst Row数据格式。
2.DataSinks,用于把数据写入到Hive表的情况。里面涉及到一些hive读写的数据格式转化、序列化、读配置等工作,最后通过SparkContext的runJob接口,提交作业。
3.HiveTableScans,用于对hive table进行扫描,支持使用谓词的分区裁剪(Partition pruning predicates are detected and applied)。
4.HiveCommandStrategy,用于执行native command和describe command。我理解是这种命令是直接调hive客户端单机执行的,因为可能只与meta data打交道。

toRDD: RDD处理也有少许区别,返回RDD的时候,对每个元素做了一次拷贝。


SQL Core
Spark SQL的核心是把已有的RDD,带上Schema信息,然后注册成类似sql里的”Table”,对其进行sql查询。这里面主要分两部分,一是生成SchemaRD,二是执行查询。

生成SchemaRDD
如果是spark-hive项目,那么读取metadata信息作为Schema、读取hdfs上数据的过程交给Hive完成,然后根据这俩部分生成SchemaRDD,在HiveContext下进行hql()查询。

对于Spark SQL来说,
数据方面,RDD可以来自任何已有的RDD,也可以来自支持的第三方格式,如json file、parquet file。
SQLContext下会把带case class的RDD隐式转化为SchemaRDD
implicit def createSchemaRDD(rdd: RDD) =
new SchemaRDD(this,
SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))

ExsitingRdd单例里会反射出case class的attributes,并把RDD的数据转化成Catalyst的GenericRow,最后返回RDD,即一个SchemaRDD。这里的具体转化逻辑可以参考ExsitingRdd的productToRowRdd和convertToCatalyst方法。
之后可以进行SchemaRDD提供的注册table操作、针对Schema复写的部分RDD转化操作、DSL操作、saveAs操作等等。

Row和GenericRow是Catalyst里的行表示模型
Row用Seq来表示values,GenericRow是Row的子类,用数组表示values。Row支持数据类型包括Int, Long, Double, Float, Boolean, Short, Byte, String。支持按序数(ordinal)读取某一个列的值。读取前需要做isNullAt(i: Int)的判断。
各自都有Mutable类,提供setXXX(i: int, value: Any)修改某序数上的值。

层次结构




下图大致对比了Pig,Spark SQL,Shark在实现层次上的区别,仅做参考。








查询流程
SQLContext里对sql的一个解析和执行流程:
1.第一步parseSql(sql: String),simple sql parser做词法语法解析,生成LogicalPlan。

2.第二步analyzer(logicalPlan),把做完词法语法解析的执行计划进行初步分析和映射,
目前SQLContext内的Analyzer由Catalyst提供,定义如下:
new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)
catalog为SimpleCatalog,catalog是用来注册table和查询relation的。
而这里的FunctionRegistry不支持lookupFunction方法,所以该analyzer不支持Function注册,即UDF。
Analyzer内定义了几批规则:
val batches: Seq = Seq(
Batch("MultiInstanceRelations", Once,
    NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once,
    (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
    ResolveReferences ::
    ResolveRelations ::
    NewRelationInstances ::
    ImplicitGenerate ::
    StarExpansion ::
    ResolveFunctions ::
    GlobalAggregates ::
    typeCoercionRules :_*),
Batch("Check Analysis", Once,
    CheckResolution),
Batch("AnalysisOperators", fixedPoint,
    EliminateAnalysisOperators)
)

3.从第二步得到的是初步的logicalPlan,接下来第三步是optimizer(plan)。
Optimizer里面也是定义了几批规则,会按序对执行计划进行优化操作。
val batches =
Batch("Combine Limits", FixedPoint(100),
    CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
    NullPropagation,
    ConstantFolding,
    LikeSimplification,
    BooleanSimplification,
    SimplifyFilters,
    SimplifyCasts,
    SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100),
    CombineFilters,
    PushPredicateThroughProject,
    PushPredicateThroughJoin,
    ColumnPruning) :: Nil


4.优化后的执行计划,还要丢给SparkPlanner处理,里面定义了一些策略,目的是根据逻辑执行计划树生成最后可以执行的物理执行计划树,即得到SparkPlan。
val strategies: Seq =
CommandStrategy(self) ::
TakeOrdered ::
PartialAggregation ::
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil


5.在最终真正执行物理执行计划前,最后还要进行两次规则,SQLContext里定义这个过程叫prepareForExecution,这个步骤是额外增加的,直接new RuleExecutor进行的。
val batches =
Batch("Add exchange", Once, AddExchange(self)) ::
Batch("Prepare Expressions", Once, new BindReferences) :: Nil

6.最后调用SparkPlan的execute()执行计算。这个execute()在每种SparkPlan的实现里定义,一般都会递归调用children的execute()方法,所以会触发整棵Tree的计算。


其他特性
内存列存储
SQLContext下cache/uncache table的时候会调用列存储模块。
该模块借鉴自Shark,目的是当把表数据cache在内存的时候做行转列操作,以便压缩。

实现类
InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划。传入一个SparkPlan(确认了的物理执行计)和一个属性序列,内部包含一个行转列、触发计算并cache的过程(且是lazy的)。

ColumnBuilder针对不同的数据类型(boolean, byte, double, float, int, long, short, string)由不同的子类把数据写到ByteBuffer里,即包装Row的每个field,生成Columns。与其对应的ColumnAccessor是访问column,将其转回Row。

CompressibleColumnBuilder和CompressibleColumnAccessor是带压缩的行列转换builder,其ByteBuffer内部存储结构如下
view plaincopy在CODE上查看代码片派生到我的代码片
*    .--------------------------- Column type ID (4 bytes)
*    |   .----------------------- Null count N (4 bytes)
*    |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
*    |   |   |   .------------- Compression scheme ID (4 bytes)
*    |   |   |   |   .--------- Compressed non-null elements
*    V   V   V   V   V
*   +---+---+-----+---+---------+
*   |   |   | ... |   | ... ... |
*+---+---+-----+---+---------+
*\-----------/ \-----------/
*       header         body

CompressionScheme子类是不同的压缩实现



都是scala实现的,未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比较每种压缩,选择压缩率最小的(若仍大于0.8就不压缩了)。
这里的估算逻辑,来自子类实现的gatherCompressibilityStats方法。

Cache逻辑
cache之前,需要先把本次cache的table的物理执行计划生成出来。
在cache这个过程里,InMemoryColumnarTableScan并没有触发执行,但是生成了以InMemoryColumnarTableScan为物理执行计划的SparkLogicalPlan,并存成table的plan。
其实在cache的时候,首先去catalog里寻找这个table的信息和table的执行计划,然后会进行执行(执行到物理执行计划生成),然后把这个table再放回catalog里维护起来,这个时候的执行计划已经是最终要执行的物理执行计划了。但是此时Columner模块相关的转换等操作都是没有触发的。
真正的触发还是在execute()的时候,同其他SparkPlan的execute()方法触发场景是一样的。

Uncache逻辑
UncacheTable的时候,除了删除catalog里的table信息之外,还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,并进行了unpersist()操作。cacheColumnBuffers主要做了把RDD每个partition里的ROW的每个Field存到了ColumnBuilder内。

UDF(暂不支持)
如前面对SQLContext里Analyzer的分析,其FunctionRegistry没有实现lookupFunction。
在spark-hive项目里,HiveContext里是实现了FunctionRegistry这个trait的,其实现为HiveFunctionRegistry,实现逻辑见org.apache.spark.sql.hive.hiveUdfs

Parquet支持
待整理
http://parquet.io/

Specific Docs and Codes:
https://github.com/apache/incubator-parquet-format
https://github.com/apache/incubator-parquet-mr
http://www.slideshare.net/julienledem/parquet-hadoop-summit-2013

JSON支持
SQLContext下,增加了jsonFile的读取方法,而且目前看,代码里实现的是hadoop textfile的读取,也就是这份json文件应该是在HDFS上的。具体这份json文件的载入,InputFormat是TextInputFormat,key class是LongWritable,value class是Text,最后得到的是value部分的那段String内容,即RDD。

除了jsonFile,还支持jsonRDD,例子:
http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

读取json文件之后,转换成SchemaRDD。JsonRDD.inferSchema(RDD)里有详细的解析json和映射出schema的过程,最后得到该json的LogicalPlan。

Json的解析使用的是FasterXML/jackson-databind库,GitHub地址,wiki
把数据映射成Map

Json的支持丰富了Spark SQL数据接入场景。

JDBC支持
Jdbc support branchis under going


SQL92
Spark SQL目前的SQL语法支持情况见SqlParser类。目标是支持SQL92??

1. 基本应用上,sql server 和oracle都遵循sql 92语法标准。
2. 实际应用中大家都会超出以上标准,使用各家数据库厂商都提供的丰富的自定义标准函数库和语法。
3. 微软sql server的sql 扩展叫T-SQL(Transcate SQL).
4. Oracle 的sql 扩展叫PL-SQL.

存在问题
大家可以跟进社区邮件列表,后续待整理。
http://apache-spark-developers-list.1001551.n3.nabble.com/sparkSQL-thread-safe-td7263.html
http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-td9538.html

总结
以上整理了对Spark SQL各个模块的实现情况,代码结构,执行流程以及自己对Spark SQL的理解。


gxynikita 发表于 2014-7-30 20:29:47

楼主讲的真心很棒呢~

梦回三国 发表于 2015-5-29 14:33:57

楼主对SparkSQL的的解析感觉属于很原理性的知识。但是我感觉对我遇到的问题还是有点不懂。能帮忙解答一下吗?http://www.aboutyun.com/forum.php?mod=viewthread&tid=13400&page=1&extra=#pid94060

小南3707 发表于 2015-6-3 11:10:07

赞!            

wingsless 发表于 2015-6-26 10:07:31

有个问题借贵宝地问问。我现在将hdfs中的文件处理成了DataFrame,但是里面有一列是时间字符串“2015-06-02 21:45:14”这样子的,我需要将其转换成相应的时间类型,不知道怎么写代码。

oveydcc 发表于 2015-10-10 14:31:02

dataframe支持INSERT INTO语句吗????为什么我运行insert语句就报错?
页: [1]
查看完整版本: 整理对Spark SQL的理解