目录
1.基本流程
Flink SQL的解析流程基于Calcite,通用流程包含以下几个步骤:1、Parse;2、Validate;3、TranslateToRel;4、Optimize;5、Execute
Parse阶段:此阶段是将用户输入的SQL语句转换为AST(抽象语法树),这一步的基础是Java CC。简单来说,就是设定好语法模板,然后将SQL语句与模板比较,将SQL语句变成分段节点。
Validate阶段:校验。基于schema信息,也就是库表列相关信息,对SQL语句进行校验,相应的库表列信息是否正确。
生成逻辑计划阶段:生成逻辑计划。前两步的结构都是SqlNode类型的,这一步是转换成RelNode/RelRoot类型的。SqlNode整体看上去还是跟写的SQL语句很形似的,RelNode则变成更加具体的结构化形态。
Optimize阶段:优化,即对前面生成的逻辑计划进行优化。因为前面都是根据用户的SQL进行直译生成的,所以执行性能不是最优的,这一步就是调整逻辑计划的顺序和结构,使运行最优。通常有RBO和CBO,即规则优化和代价优化。规则优化就是根据定义好的规则,直接应用,无关运行环境;代价优化则根据运行环境、历史运行信息等,推测可能的优化执行计划。
生成物理计划阶段:这一步就是将前面的RelNode转换成可执行的结构,对应不同的平台有不同的结果,如Flink的DataStream。
2.基本结构
Flink的代码执行都需要一个Environment,在SQL上,其Environment的基本接口就是TableEnvironment。类的集成关系如下,核心实现就是TableEnvironmentImpl
对于SQL查询,有两个比较重要的方法:sqlQuery、executeSql。其中sqlQuery并不会真正的执行SQL语句,需要后续调用
Table#execute()
才会真正执行,接口返回的是一个Table;executeSql就是直接执行SQL语句的接口,接口返回的是一个TableResult。
以executeSql追踪整个接口调用流程。
3.代码流程图
代码流程整体分两部分:1、parse解析SQL语句;2、一系列的转化优化处理
解析主要基于calcite的parse进行,flink也有一些扩展。此外,在解析后Flink还进行了额外的封装,将其封装成Operation。在封装这一步已经有了SqlNode向逻辑计划的转换操作,Operation封装是基于RelNode进行的。
第二阶段的核心是PlannerBase,相应转化都在其中进行,包括了优化及物理计划的转换等。
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
val transformations = translateToPlan(execGraph)
4.执行样例
通过Flink的LocalStreamEnvironment进行本地调试,查看每阶段的输出,调试使用的SQL语句为:
String querySql = "SELECT \n" +
" t1.id, 1 + 2 + t1.score AS v \n" +
"FROM t1, t2 \n" +
"WHERE \n" +
" t1.id = t2.id AND \n" +
" t2.id < 1000";
4.1.calcite的parse解析
对应ParserImpl.parse()中
SqlNodeList sqlNodeList = parser.parseSqlList(statement)
的执行结果
4.2.validate校验
对应SqlToOperationConverter.convert()中
final SqlNode validated = flinkPlanner.validate(sqlNode)
的执行结果。可以看到,解析完以后和解析前的一个重要差别就是表前面加上了catalog和database的信息
4.3.RelRoot获取
对应SqlToOperationConverter.toQueryOperation()中的
RelRoot relational = planner.rel(validated)
的执行结果。可以看到,这里已经完全看不出SQL语句的形态了,完全是结构化的形态了。
4.4.封装Operator
对应SqlToOperationConverter.toQueryOperation()中的
return new PlannerQueryOperation(relational.project());
的执行结果,基本上是做一个封装,内容基本没有太大变化。
4.5.translateToRel
接下来的几步都在PlannerBase.translate()当中。对应
val relNodes = modifyOperations.map(translateToRel)
的执行结果。因为前面已经转换过了,所以这里的基本结构没有太大变化,在顶部加入了LogicSink。这应该跟Flink的执行态有关,需要sink才是完整的流程。
4.6.optimize优化
对应
val optimizedRelNodes = optimize(relNodes)
的执行结果,做优化处理。这里明显可以看到的就是优化合并的,比如1+2合并成了3
4.7.转换物理计划
对应
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
的执行结果。这里可以看到,已经没有之前SQL和Calcite相关的结构了
4.8.translateToPlan
对应
val transformations = translateToPlan(execGraph)
的执行结果。这里就是转换成比较熟悉的DAG图的形态。