Flink-SQL基础部分

  • Post author:
  • Post category:其他




1.基本流程

Flink SQL的解析流程基于Calcite,通用流程包含以下几个步骤:1、Parse;2、Validate;3、TranslateToRel;4、Optimize;5、Execute

Parse阶段:此阶段是将用户输入的SQL语句转换为AST(抽象语法树),这一步的基础是Java CC。简单来说,就是设定好语法模板,然后将SQL语句与模板比较,将SQL语句变成分段节点。

Validate阶段:校验。基于schema信息,也就是库表列相关信息,对SQL语句进行校验,相应的库表列信息是否正确。

生成逻辑计划阶段:生成逻辑计划。前两步的结构都是SqlNode类型的,这一步是转换成RelNode/RelRoot类型的。SqlNode整体看上去还是跟写的SQL语句很形似的,RelNode则变成更加具体的结构化形态。

Optimize阶段:优化,即对前面生成的逻辑计划进行优化。因为前面都是根据用户的SQL进行直译生成的,所以执行性能不是最优的,这一步就是调整逻辑计划的顺序和结构,使运行最优。通常有RBO和CBO,即规则优化和代价优化。规则优化就是根据定义好的规则,直接应用,无关运行环境;代价优化则根据运行环境、历史运行信息等,推测可能的优化执行计划。

生成物理计划阶段:这一步就是将前面的RelNode转换成可执行的结构,对应不同的平台有不同的结果,如Flink的DataStream。



2.基本结构

Flink的代码执行都需要一个Environment,在SQL上,其Environment的基本接口就是TableEnvironment。类的集成关系如下,核心实现就是TableEnvironmentImpl

在这里插入图片描述

对于SQL查询,有两个比较重要的方法:sqlQuery、executeSql。其中sqlQuery并不会真正的执行SQL语句,需要后续调用

Table#execute()

才会真正执行,接口返回的是一个Table;executeSql就是直接执行SQL语句的接口,接口返回的是一个TableResult。

以executeSql追踪整个接口调用流程。



3.代码流程图

在这里插入图片描述

代码流程整体分两部分:1、parse解析SQL语句;2、一系列的转化优化处理

解析主要基于calcite的parse进行,flink也有一些扩展。此外,在解析后Flink还进行了额外的封装,将其封装成Operation。在封装这一步已经有了SqlNode向逻辑计划的转换操作,Operation封装是基于RelNode进行的。

第二阶段的核心是PlannerBase,相应转化都在其中进行,包括了优化及物理计划的转换等。

val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
val transformations = translateToPlan(execGraph)



4.执行样例

通过Flink的LocalStreamEnvironment进行本地调试,查看每阶段的输出,调试使用的SQL语句为:

String querySql = "SELECT \n" +
        "  t1.id, 1 + 2 + t1.score AS v \n" +
        "FROM t1, t2 \n" +
        "WHERE \n" +
        "  t1.id = t2.id AND \n" +
        "  t2.id < 1000";



4.1.calcite的parse解析

对应ParserImpl.parse()中

SqlNodeList sqlNodeList = parser.parseSqlList(statement)

的执行结果

在这里插入图片描述



4.2.validate校验

对应SqlToOperationConverter.convert()中

final SqlNode validated = flinkPlanner.validate(sqlNode)

的执行结果。可以看到,解析完以后和解析前的一个重要差别就是表前面加上了catalog和database的信息

在这里插入图片描述



4.3.RelRoot获取

对应SqlToOperationConverter.toQueryOperation()中的

RelRoot relational = planner.rel(validated)

的执行结果。可以看到,这里已经完全看不出SQL语句的形态了,完全是结构化的形态了。

在这里插入图片描述



4.4.封装Operator

对应SqlToOperationConverter.toQueryOperation()中的

return new PlannerQueryOperation(relational.project());

的执行结果,基本上是做一个封装,内容基本没有太大变化。

在这里插入图片描述



4.5.translateToRel

接下来的几步都在PlannerBase.translate()当中。对应

val relNodes = modifyOperations.map(translateToRel)

的执行结果。因为前面已经转换过了,所以这里的基本结构没有太大变化,在顶部加入了LogicSink。这应该跟Flink的执行态有关,需要sink才是完整的流程。

在这里插入图片描述



4.6.optimize优化

对应

val optimizedRelNodes = optimize(relNodes)

的执行结果,做优化处理。这里明显可以看到的就是优化合并的,比如1+2合并成了3

在这里插入图片描述



4.7.转换物理计划

对应

val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)

的执行结果。这里可以看到,已经没有之前SQL和Calcite相关的结构了

在这里插入图片描述



4.8.translateToPlan

对应

val transformations = translateToPlan(execGraph)

的执行结果。这里就是转换成比较熟悉的DAG图的形态。

在这里插入图片描述



版权声明:本文为blackjjcat原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。