概述
Optimizer 中的预处理
当存在多列distinct计算时,Optimizer执行
RewriteDistinctAggregates
规则时,该规则会将多列distinct展开(通过插入Expand算子),非distinct聚合列和每个distinct聚合列会被分为不同的组(假设为N组),每个组为一行数据并带有group id,这样一行数据会被扩展为N行。之后,用两层Aggregate算子计算Expand之后的数据,第一层按前面的分组聚合,第二层再将结果聚合。引用
RewriteDistinctAggregates
的注释中的例子说明:
val data = Seq(
("a", "ca1", "cb1", 10),
("a", "ca1", "cb2", 5),
("b", "ca1", "cb1", 13))
.toDF("key", "cat1", "cat2", "value")
data.createOrReplaceTempView("data")
val agg = data.groupBy($"key")
.agg(
countDistinct($"cat1").as("cat1_cnt"),
countDistinct($"cat2").as("cat2_cnt"),
sum($"value").as("total"))
原始逻辑计划:
Aggregate(
key = ['key]
functions = [
COUNT(DISTINCT 'cat1),
COUNT(DISTINCT 'cat2),
sum('value)]
output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
LocalTableScan [...]
改造后逻辑计划
Aggregate(
key = ['key]
functions = [
count(if (
版权声明:本文为weixin_42265234原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。