Spark源码-2.3 Aggregate物理实现-3种聚合物理算子

  • Post author:
  • Post category:其他




概述



Optimizer 中的预处理

当存在多列distinct计算时,Optimizer执行

RewriteDistinctAggregates

规则时,该规则会将多列distinct展开(通过插入Expand算子),非distinct聚合列和每个distinct聚合列会被分为不同的组(假设为N组),每个组为一行数据并带有group id,这样一行数据会被扩展为N行。之后,用两层Aggregate算子计算Expand之后的数据,第一层按前面的分组聚合,第二层再将结果聚合。引用

RewriteDistinctAggregates

的注释中的例子说明:

val data = Seq(
    ("a", "ca1", "cb1", 10),
    ("a", "ca1", "cb2", 5),
    ("b", "ca1", "cb1", 13))
    .toDF("key", "cat1", "cat2", "value")
data.createOrReplaceTempView("data")

val agg = data.groupBy($"key")
        .agg(
            countDistinct($"cat1").as("cat1_cnt"),
            countDistinct($"cat2").as("cat2_cnt"),
        sum($"value").as("total"))

原始逻辑计划:

Aggregate(
    key = ['key]
    functions = [
        COUNT(DISTINCT 'cat1), 
        COUNT(DISTINCT 'cat2), 
        sum('value)]
    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
    LocalTableScan [...]

改造后逻辑计划

Aggregate(
    key = ['key]
    functions = [
        count(if (



版权声明:本文为weixin_42265234原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。