分享

一文解析Flink SQL工作流程

问题导读:
1、如何创建calcite logical Plan?
2、什么负责sql的parse和验证?
3、FlinkRuleSets定义哪些rule?
4、如何将DataStreamRel转换成flink可执行的代码?






Flink Sql总体架构
2019-01-15_221633.jpg

Table API 和 SQL 如何创建calcite logical Plan (parsing)
2019-01-15_221724.jpg
两者略有不同
  • SQL:SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过validate验证sql语法(结合catalog)。最后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)
  • Table API -调用Table API 实际上是创建了很多 Table API 的 LogicalNode (生成了table API的抽象语法树),创建的过程中对会对整个query进行validate。LogicalNode 比如table是CalalogNode,window groupBy之后在select时会创建WindowAggregate和Project,where对应Filter。然后用calcite.RelBuilder翻译成Calcite LogicalPlan。如果是SQL API 将直接用Calcite的Parser进行解释然后validate生成Calcite LogicalPlan。


  • sql完全依靠calcite(sql parser)去做语法解析,validate后生成calcite logical plan. 而Table API先自己生成table API的logical plan,再通过calcite relbuilder translation成calcite logical plan。

2019-01-15_221929.jpg

使用calcite cost-based optimizor 进行优化。也就是说和spark不同, flink 的SQL Parsing, Analysing, Optimizing都是托管给calcite(flink会加入一些optimze rules). Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。

这里的优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),再基于flink定制的一些优化rules(根据是streaming还是batch选择rulue)去优化logical Plan。

生成phsyical plan,基于flink里头的rules生成了DataStream Plan(Physical Plan)

将物Physical Plan转成Flink ExecutionPlan, 其通过调用相应的tanslateToPlan()转换和利用CodeGen成Flink的各种算子。

  • CodeGen 出的Function以字符串的形式存在。在提交任务后会分发到各个 > TaskManager 中运行,在运行时会使用 [Janino](http://janino-> compiler.github.io/janino/) 编译器编译代码后运行。

逻辑和spark类似,只不过calcite做了catalyst的事(sql parsing,analysis和optimizing)。


TableEnvironment
TableEnvironment.sql() - 负责sql的parse和验证

[mw_shl_code=python,true]def sql(query: String): Table = {
   val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
   // parse the sql query, 创建sqlNode组成的AST
   val parsed = planner.parse(query)
   // validate the sql query
   val validated = planner.validate(parsed)
   // transform to a relational tree , 从纯AST 语法树转换成 relation expression tress,为optimze做准备
   val relational = planner.rel(validated)

   new Table(this, LogicalRelNode(relational.rel))
}[/mw_shl_code]

Table.writeToSink()

这个方法中主要调用了StreamTableEnvironment的optimze()和translate()两个方法。 负责在sql()中得到的AST tress的Optimize和生产physical plan的过程。
[mw_shl_code=python,true]override private[flink] def writeToSink[T](
      table: Table,
      sink: TableSink[T],
      queryConfig: QueryConfig): Unit = {

   case retractSink: RetractStreamTableSink[_] =>
        // retraction sink can always be used
        val outputType = sink.getOutputType
        // translate the Table into a DataStream and provide the type that the TableSink expects.
        val result: DataStream[T] =
          translate(
            table,
            streamQueryConfig,
            updatesAsRetraction = true,
            withChangeFlag = true)(outputType)
        // Give the DataStream to the TableSink to emit it.
        retractSink.asInstanceOf[RetractStreamTableSink[Any]]
          .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
}[/mw_shl_code]

StreamTableEnvirnoment
SteamTableEnvironment.optimize()

FlinkRelNode三个children, FlinkLogicalRel,  DataStreamRel. DataSetRel
FlinkLogicalRel 对应生产的logical plan node
DataStreamRel对应data stream physical plan
DataSetRel 对应dataset physical plan
DataStreamGroupAggregateRule中
Stream
[mw_shl_code=python,true]// 1. decorrelate
// 转化掉correlated expressions, aka correlated subquery,subquery use Where clause expression refer to variable of table from outer query
    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

// 2. convert time indicators
    // 转换time的标识符,比如存在rowtime标识的话,我们将会引入TimeMaterializationSqlFunction operator,
    //这个operator我们会在codeGen中会用到
    val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)

// 3. normalize the logical plan
    val normRuleSet = getNormRuleSet
    val normalizedPlan = if (normRuleSet.iterator().hasNext) {
      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
    } else {
      convPlan
    }

// 4. Optimize the logical Flink plan
    // 优化逻辑计划,做逻辑上的优化(如filter, projection push down)。同时将node转换成派生于FlinkLogicalRel的节点
  // define logcial plan rules
    val logicalOptRuleSet = getLogicalOptRuleSet
    //用FlinkConventions.LOGICAL替换traitSet, 作为target output traits
    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
    val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
      runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
    } else {
      normalizedPlan
    }

    // 将optimizd logical plan 转换为physical plan,同时将 节点转换成派生于DataStreamRel的节点
    val physicalOptRuleSet = getPhysicalOptRuleSet
    val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
      runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
    } else {
      logicalPlan
    }[/mw_shl_code]

FlinkRuleSets定义了很多rule, 用于在Optimize过程中

  • DATASTREAM_NORM_RULES: 把一些节点进行normalized转换
  • LOGICAL_OPT_RULES: logical plan优化rules, 里面包含都是sql语句相关的优化(下推,剪裁), 同时做转换:SQLRel -》FlinkLogicalRel
  • DATASTREAM_OPT_RULES: 把logical plan node转换成datastream physical
  • plan的rule, FlinkLogicalRel  -> DataStreamRel。其中的rules都继承自calcite ConvertRule, 而convertRule:> RelOptRule

ConvertRule

Abstract base class for a rule which converts from one calling convention to another without changing semantics.

ConvertRule有很多继承类(比如DATASTREAM_OPT_RULES中所有,以及LOGICAL_OPT_RULES中转换flinkLogicalPlan的rules), 都实现了其convert()接口,convert()会在onMatch中调用。
ConverterRule.onMatch(RelOptRuleCall call)
relOptRule提供了onMatch(RelOptRuleCall call) 被ConverterRule继承, 目的是当matchs() return true后被通知调用

[mw_shl_code=python,true]public void onMatch(RelOptRuleCall call) {
// rel 是RelOptRuleCall中rels[0] , 代表被convert的original rel
    RelNode rel = call.rel(0);
// 如果包含此trait,
    if (rel.getTraitSet().contains(inTrait)) {
      final RelNode converted = convert(rel);
      if (converted != null) {
        call.transformTo(converted);
      }
    }
  }
[/mw_shl_code]

  •     TODO LOGICAL_OPT_RULES中实现flinkLogicalPlan的rule不光实现了convert()还实现了matchs()
  •     convert()可以用debug走一遍, 看看inTrait到底是什么

Normalize Rules
[mw_shl_code=python,true]/**
    * RuleSet to normalize plans for stream / DataStream execution
    */
  val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
    // Transform window to LogicalWindowAggregate
    DataStreamLogicalWindowAggregateRule.INSTANCE,
    WindowStartEndPropertiesRule.INSTANCE,

    // simplify expressions rules
    // 这3个rule都是从3种expresion中移除constats(用RexLiteral替换)和多余的cast,
   //(如果cast(cast date),inner的input和outer的output类型相同,就能删掉)
    ReduceExpressionsRule.FILTER_INSTANCE,
    ReduceExpressionsRule.PROJECT_INSTANCE,
    ReduceExpressionsRule.CALC_INSTANCE,

//如果`org.apache.calcite.rel.core.Project`中包含over的expression(Projec中包含多个expressions.类似select多个column), 那么就拆分为一个logicalProject和一个LogicalWindow(分离为windowed agg和不相干的部分)
    ProjectToWindowRule.PROJECT
  )
[/mw_shl_code]

LogicalCalc -  A relational expression which computes project expressions and also filters. 它包含了{@link LogicalProject} and {@link LogicalFilter}的功能.


[mw_shl_code=python,true]val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(

// calcite内部的一些默认优化
    // convert a logical table scan to a relational expression
    TableScanRule.INSTANCE,
    EnumerableToLogicalTableScan.INSTANCE,

    // push a filter into a join
    FilterJoinRule.FILTER_ON_JOIN,
    // push filter into the children of a join
    FilterJoinRule.JOIN,
    // push filter through an aggregation
    FilterAggregateTransposeRule.INSTANCE,
    还有很多....................................................
// flink提供的优化, project和filter下推到表scan
    // scan optimization
    PushProjectIntoTableSourceScanRule.INSTANCE,
    PushFilterIntoTableSourceScanRule.INSTANCE,

    // Unnest rule
    LogicalUnnestRule.INSTANCE,

    // translate to flink logical rel nodes
    FlinkLogicalAggregate.CONVERTER,
    FlinkLogicalWindowAggregate.CONVERTER,
    FlinkLogicalOverWindow.CONVERTER,
    FlinkLogicalCalc.CONVERTER,
    FlinkLogicalCorrelate.CONVERTER,
    FlinkLogicalIntersect.CONVERTER,
    FlinkLogicalJoin.CONVERTER,
    FlinkLogicalMinus.CONVERTER,
    FlinkLogicalSort.CONVERTER,
    FlinkLogicalUnion.CONVERTER,
    FlinkLogicalValues.CONVERTER,
    FlinkLogicalTableSourceScan.CONVERTER,
    FlinkLogicalTableFunctionScan.CONVERTER,
    FlinkLogicalNativeTableScan.CONVERTER
  )
[/mw_shl_code]

DATASTREAM_OPT_RULES 略, convertRule, 用于转换logical plan到DataStreamRel
DATASTREAM_DECO_RULES
/**
* RuleSet to decorate plans for stream / DataStream execution
转化为三种output表, 实现都是RelOptRule的实现
*/
val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
// retraction rules
DataStreamRetractionRules.DEFAULT_RETRACTION_INSTANCE,
DataStreamRetractionRules.UPDATES_AS_RETRACTION_INSTANCE,
DataStreamRetractionRules.ACCMODE_INSTANCE
)

SteamTableEnvironment.translate()
translate()在TableEnvironment.writeToSink中被call,主要逻辑是将DataStreamRel转换成flink可执行的代码(DataStream)


[mw_shl_code=python,true]protected def translate[A](
      logicalPlan: RelNode,
      logicalType: RelDataType,
      queryConfig: StreamQueryConfig,
      withChangeFlag: Boolean)
      (implicit tpe: TypeInformation[A]): DataStream[A] = {

    // if no change flags are requested, verify table is an insert-only (append-only) table.
    if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
      throw new TableException(
        "Table is not an append-only table. " +
        "Use the toRetractStream() in order to handle add and retract messages.")
    }

    // 这个方法是转换核心,其实是调用各个DataStreamRel的implementation的tranlateToPlan
    // get CRow plan
    val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)

    // convert CRow to output type
    val conversion = if (withChangeFlag) {
      getConversionMapperWithChanges(
        plan.getType,
        new RowSchema(logicalType),
        tpe,
        "DataStreamSinkConversion")
    } else {
      getConversionMapper(
        plan.getType,
        new RowSchema(logicalType),
        tpe,
        "DataStreamSinkConversion")
    }

    val rootParallelism = plan.getParallelism

    conversion match {
      case mapFunction: MapFunction[CRow, A] =>
        plan.map(mapFunction)
          .returns(tpe)
          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
          .setParallelism(rootParallelism)
    }
  }[/mw_shl_code]

自己的一些笔记:
HINT:
flink-table下有org.apache.flink.table.plan.nodes.datastream和org.apache.flink.table.plan.nodes.dataset两个包, 下面放着很多继承calcite XXXnode的class。
LogicalNode 貌似是TableAPI中 flink自身AST中的node,其中有toRelNode方法用于转化成calcite AST
DataStreamGroupWindowAggregate
AggregateUtil.createDataStreamAggregateFunction

作者:nicochen2012
来源:https://www.jianshu.com/p/3191b5b91d38


本帖被以下淘专辑推荐:

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条