问题导读:
1、如何创建calcite logical Plan?
2、什么负责sql的parse和验证?
3、FlinkRuleSets定义哪些rule?
4、如何将DataStreamRel转换成flink可执行的代码?
Flink Sql总体架构
Table API 和 SQL 如何创建calcite logical Plan (parsing)
两者略有不同
- SQL:SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过validate验证sql语法(结合catalog)。最后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)
- Table API -调用Table API 实际上是创建了很多 Table API 的 LogicalNode (生成了table API的抽象语法树),创建的过程中对会对整个query进行validate。LogicalNode 比如table是CalalogNode,window groupBy之后在select时会创建WindowAggregate和Project,where对应Filter。然后用calcite.RelBuilder翻译成Calcite LogicalPlan。如果是SQL API 将直接用Calcite的Parser进行解释然后validate生成Calcite LogicalPlan。
- sql完全依靠calcite(sql parser)去做语法解析,validate后生成calcite logical plan. 而Table API先自己生成table API的logical plan,再通过calcite relbuilder translation成calcite logical plan。
使用calcite cost-based optimizor 进行优化。也就是说和spark不同, flink 的SQL Parsing, Analysing, Optimizing都是托管给calcite(flink会加入一些optimze rules). Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。
这里的优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),再基于flink定制的一些优化rules(根据是streaming还是batch选择rulue)去优化logical Plan。
生成phsyical plan,基于flink里头的rules生成了DataStream Plan(Physical Plan)
将物Physical Plan转成Flink ExecutionPlan, 其通过调用相应的tanslateToPlan()转换和利用CodeGen成Flink的各种算子。
- CodeGen 出的Function以字符串的形式存在。在提交任务后会分发到各个 > TaskManager 中运行,在运行时会使用 [Janino](http://janino-> compiler.github.io/janino/) 编译器编译代码后运行。
逻辑和spark类似,只不过calcite做了catalyst的事(sql parsing,analysis和optimizing)。
TableEnvironment
TableEnvironment.sql() - 负责sql的parse和验证
[mw_shl_code=python,true]def sql(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query, 创建sqlNode组成的AST
val parsed = planner.parse(query)
// validate the sql query
val validated = planner.validate(parsed)
// transform to a relational tree , 从纯AST 语法树转换成 relation expression tress,为optimze做准备
val relational = planner.rel(validated)
new Table(this, LogicalRelNode(relational.rel))
}[/mw_shl_code]
Table.writeToSink()
这个方法中主要调用了StreamTableEnvironment的optimze()和translate()两个方法。 负责在sql()中得到的AST tress的Optimize和生产physical plan的过程。
[mw_shl_code=python,true]override private[flink] def writeToSink[T](
table: Table,
sink: TableSink[T],
queryConfig: QueryConfig): Unit = {
case retractSink: RetractStreamTableSink[_] =>
// retraction sink can always be used
val outputType = sink.getOutputType
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
translate(
table,
streamQueryConfig,
updatesAsRetraction = true,
withChangeFlag = true)(outputType)
// Give the DataStream to the TableSink to emit it.
retractSink.asInstanceOf[RetractStreamTableSink[Any]]
.emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
}[/mw_shl_code]
StreamTableEnvirnoment
SteamTableEnvironment.optimize()
FlinkRelNode三个children, FlinkLogicalRel, DataStreamRel. DataSetRel
FlinkLogicalRel 对应生产的logical plan node
DataStreamRel对应data stream physical plan
DataSetRel 对应dataset physical plan
DataStreamGroupAggregateRule中
Stream
[mw_shl_code=python,true]// 1. decorrelate
// 转化掉correlated expressions, aka correlated subquery,subquery use Where clause expression refer to variable of table from outer query
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
// 2. convert time indicators
// 转换time的标识符,比如存在rowtime标识的话,我们将会引入TimeMaterializationSqlFunction operator,
//这个operator我们会在codeGen中会用到
val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
// 3. normalize the logical plan
val normRuleSet = getNormRuleSet
val normalizedPlan = if (normRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
} else {
convPlan
}
// 4. Optimize the logical Flink plan
// 优化逻辑计划,做逻辑上的优化(如filter, projection push down)。同时将node转换成派生于FlinkLogicalRel的节点
// define logcial plan rules
val logicalOptRuleSet = getLogicalOptRuleSet
//用FlinkConventions.LOGICAL替换traitSet, 作为target output traits
val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
} else {
normalizedPlan
}
// 将optimizd logical plan 转换为physical plan,同时将 节点转换成派生于DataStreamRel的节点
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
} else {
logicalPlan
}[/mw_shl_code]
FlinkRuleSets定义了很多rule, 用于在Optimize过程中
- DATASTREAM_NORM_RULES: 把一些节点进行normalized转换
- LOGICAL_OPT_RULES: logical plan优化rules, 里面包含都是sql语句相关的优化(下推,剪裁), 同时做转换:SQLRel -》FlinkLogicalRel
- DATASTREAM_OPT_RULES: 把logical plan node转换成datastream physical
- plan的rule, FlinkLogicalRel -> DataStreamRel。其中的rules都继承自calcite ConvertRule, 而convertRule:> RelOptRule
ConvertRule
Abstract base class for a rule which converts from one calling convention to another without changing semantics.
ConvertRule有很多继承类(比如DATASTREAM_OPT_RULES中所有,以及LOGICAL_OPT_RULES中转换flinkLogicalPlan的rules), 都实现了其convert()接口,convert()会在onMatch中调用。
ConverterRule.onMatch(RelOptRuleCall call)
relOptRule提供了onMatch(RelOptRuleCall call) 被ConverterRule继承, 目的是当matchs() return true后被通知调用
[mw_shl_code=python,true]public void onMatch(RelOptRuleCall call) {
// rel 是RelOptRuleCall中rels[0] , 代表被convert的original rel
RelNode rel = call.rel(0);
// 如果包含此trait,
if (rel.getTraitSet().contains(inTrait)) {
final RelNode converted = convert(rel);
if (converted != null) {
call.transformTo(converted);
}
}
}
[/mw_shl_code]
- TODO LOGICAL_OPT_RULES中实现flinkLogicalPlan的rule不光实现了convert()还实现了matchs()
- convert()可以用debug走一遍, 看看inTrait到底是什么
Normalize Rules
[mw_shl_code=python,true]/**
* RuleSet to normalize plans for stream / DataStream execution
*/
val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
// Transform window to LogicalWindowAggregate
DataStreamLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE,
// simplify expressions rules
// 这3个rule都是从3种expresion中移除constats(用RexLiteral替换)和多余的cast,
//(如果cast(cast date),inner的input和outer的output类型相同,就能删掉)
ReduceExpressionsRule.FILTER_INSTANCE,
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
//如果`org.apache.calcite.rel.core.Project`中包含over的expression(Projec中包含多个expressions.类似select多个column), 那么就拆分为一个logicalProject和一个LogicalWindow(分离为windowed agg和不相干的部分)
ProjectToWindowRule.PROJECT
)
[/mw_shl_code]
LogicalCalc - A relational expression which computes project expressions and also filters. 它包含了{@link LogicalProject} and {@link LogicalFilter}的功能.
[mw_shl_code=python,true]val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
// calcite内部的一些默认优化
// convert a logical table scan to a relational expression
TableScanRule.INSTANCE,
EnumerableToLogicalTableScan.INSTANCE,
// push a filter into a join
FilterJoinRule.FILTER_ON_JOIN,
// push filter into the children of a join
FilterJoinRule.JOIN,
// push filter through an aggregation
FilterAggregateTransposeRule.INSTANCE,
还有很多....................................................
// flink提供的优化, project和filter下推到表scan
// scan optimization
PushProjectIntoTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
// Unnest rule
LogicalUnnestRule.INSTANCE,
// translate to flink logical rel nodes
FlinkLogicalAggregate.CONVERTER,
FlinkLogicalWindowAggregate.CONVERTER,
FlinkLogicalOverWindow.CONVERTER,
FlinkLogicalCalc.CONVERTER,
FlinkLogicalCorrelate.CONVERTER,
FlinkLogicalIntersect.CONVERTER,
FlinkLogicalJoin.CONVERTER,
FlinkLogicalMinus.CONVERTER,
FlinkLogicalSort.CONVERTER,
FlinkLogicalUnion.CONVERTER,
FlinkLogicalValues.CONVERTER,
FlinkLogicalTableSourceScan.CONVERTER,
FlinkLogicalTableFunctionScan.CONVERTER,
FlinkLogicalNativeTableScan.CONVERTER
)
[/mw_shl_code]
DATASTREAM_OPT_RULES 略, convertRule, 用于转换logical plan到DataStreamRel
DATASTREAM_DECO_RULES
/**
* RuleSet to decorate plans for stream / DataStream execution
转化为三种output表, 实现都是RelOptRule的实现
*/
val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
// retraction rules
DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE,
DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
DataStreamRetractionRules.ACCMODE_INSTANCE
)
SteamTableEnvironment.translate()
translate()在TableEnvironment.writeToSink中被call,主要逻辑是将DataStreamRel转换成flink可执行的代码(DataStream)
[mw_shl_code=python,true]protected def translate[A](
logicalPlan: RelNode,
logicalType: RelDataType,
queryConfig: StreamQueryConfig,
withChangeFlag: Boolean)
(implicit tpe: TypeInformation[A]): DataStream[A] = {
// if no change flags are requested, verify table is an insert-only (append-only) table.
if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
throw new TableException(
"Table is not an append-only table. " +
"Use the toRetractStream() in order to handle add and retract messages.")
}
// 这个方法是转换核心,其实是调用各个DataStreamRel的implementation的tranlateToPlan
// get CRow plan
val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
// convert CRow to output type
val conversion = if (withChangeFlag) {
getConversionMapperWithChanges(
plan.getType,
new RowSchema(logicalType),
tpe,
"DataStreamSinkConversion")
} else {
getConversionMapper(
plan.getType,
new RowSchema(logicalType),
tpe,
"DataStreamSinkConversion")
}
val rootParallelism = plan.getParallelism
conversion match {
case mapFunction: MapFunction[CRow, A] =>
plan.map(mapFunction)
.returns(tpe)
.name(s"to: ${tpe.getTypeClass.getSimpleName}")
.setParallelism(rootParallelism)
}
}[/mw_shl_code]
自己的一些笔记:
HINT:
flink-table下有org.apache.flink.table.plan.nodes.datastream和org.apache.flink.table.plan.nodes.dataset两个包, 下面放着很多继承calcite XXXnode的class。
LogicalNode 貌似是TableAPI中 flink自身AST中的node,其中有toRelNode方法用于转化成calcite AST
DataStreamGroupWindowAggregate
AggregateUtil.createDataStreamAggregateFunction
作者:nicochen2012
来源:https://www.jianshu.com/p/3191b5b91d38
|
|