双重group by优化原理

  • Post author:
  • Post category:其他




原始sql

explain 
select 
      count(distinct user_id)
      ,count(1)
from 
	  test
where dt = '2020-10-01';



优化sql

explain 
select 
      count(1)test
      ,sum(ucnt) 
from (
      select
    	    user_id
    	    ,count(1) as ucnt
	  from test
	  where dt = '2020-10-01'
	  group by user_id
) t;	



spark执行计划

原始sql:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1), count(distinct user_id#2987358L)])
   +- Exchange SinglePartition, true, [id=#8407251]
      +- HashAggregate(keys=[], functions=[merge_count(1), partial_count(distinct user_id#2987358L)])
         +- HashAggregate(keys=[user_id#2987358L], functions=[merge_count(1)])
            +- Exchange hashpartitioning(user_id#2987358L, 200), true, [id=#8407247]
               +- HashAggregate(keys=[user_id#2987358L], functions=[partial_count(1)])
                  +- Scan hive tmp.test[user_id#2987358L], HiveTableRelation `tmp`.`test`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [user_id#2987358L, ... 244 more fields], [dt#2987626], Stream(CatalogPartition(
        Partition Values: [dt=2020-10-01]
        Location: hdfs://test/user/hive/warehouse/tmp.db/test/dt=2020-10-01
        Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
        InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
        OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat

优化sql:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1), sum(ucnt#2988041L)])
   +- Exchange SinglePartition, true, [id=#8408064]
      +- HashAggregate(keys=[], functions=[partial_count(1), partial_sum(ucnt#2988041L)])
         +- HashAggregate(keys=[user_id#2988049L], functions=[count(1)])
            +- Exchange hashpartitioning(user_id#2988049L, 200), true, [id=#8408060]
               +- HashAggregate(keys=[user_id#2988049L], functions=[partial_count(1)])
                  +- Scan hive tmp.test[user_id#2988049L], HiveTableRelation `tmp`.`test`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [user_id#2988049L, ... 244 more fields], [dt#2988317], Stream(CatalogPartition(
        Partition Values: [dt=2020-10-01]
        Location: hdfs://test/user/hive/warehouse/tmp.db/test/dt=2020-10-01
        Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
        InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
        OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
        Storage Properties: [serialization.format=1]



hive执行计划

原始sql:

STAGE DEPENDENCIES:                                
  Stage-1 is a root stage                          
  Stage-0 depends on stages: Stage-1               
                                                   
STAGE PLANS:                                       
  Stage: Stage-1                                   
    Map Reduce                                     
      Map Operator Tree:                           
          TableScan                                
            alias: test               
            filterExpr: (dt = '2020-10-01') (type: boolean) 
            Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
            Select Operator                        
              expressions: user_id (type: bigint)  
              outputColumnNames: user_id           
              Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
              Group By Operator                    
                aggregations: count(DISTINCT user_id), count(1) 
                keys: user_id (type: bigint)       
                mode: hash                         
                outputColumnNames: _col0, _col1, _col2 
                Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
                Reduce Output Operator             
                  key expressions: _col0 (type: bigint) 
                  sort order: +                    
                  Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
                  value expressions: _col2 (type: bigint) 
      Reduce Operator Tree:                        
        Group By Operator                          
          aggregations: count(DISTINCT KEY._col0:0._col0), count(VALUE._col1) 
          mode: mergepartial                       
          outputColumnNames: _col0, _col1          
          Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE 
          File Output Operator                     
            compressed: true                       
            Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE 
            table:                                 
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat 
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat 
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe 
                                                  
  Stage: Stage-0                                  
    Fetch Operator                                
      limit: -1                                   
      Processor Tree:                             
        ListSink 

优化sql:

STAGE DEPENDENCIES:                                
  Stage-1 is a root stage                          
  Stage-2 depends on stages: Stage-1               
  Stage-0 depends on stages: Stage-2               
                                                   
STAGE PLANS:                                       
  Stage: Stage-1                                   
    Map Reduce                                     
      Map Operator Tree:                           
          TableScan                                
            alias: test
            filterExpr: (dt = '2020-10-01') (type: boolean) 
            Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
            Select Operator                        
              expressions: user_id (type: bigint)  
              outputColumnNames: user_id           
              Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
              Group By Operator                    
                aggregations: count(1)             
                keys: user_id (type: bigint)       
                mode: hash                         
                outputColumnNames: _col0, _col1    
                Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
                Reduce Output Operator             
                  key expressions: _col0 (type: bigint) 
                  sort order: +                    
                  Map-reduce partition columns: _col0 (type: bigint) 
                  Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE 
                  value expressions: _col1 (type: bigint) 
      Reduce Operator Tree:                        
        Group By Operator                          
          aggregations: count(VALUE._col0)         
          keys: KEY._col0 (type: bigint)           
          mode: mergepartial                       
          outputColumnNames: _col0, _col1          
          Statistics: Num rows: 58646848 Data size: 17541792694 Basic stats: COMPLETE Column stats: NONE 
          Select Operator                          
            expressions: _col1 (type: bigint)      
            outputColumnNames: _col1               
            Statistics: Num rows: 58646848 Data size: 17541792694 Basic stats: COMPLETE Column stats: NONE 
            Group By Operator                      
              aggregations: count(1), sum(_col1)   
              mode: hash                           
              outputColumnNames: _col0, _col1      
              Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE 
              File Output Operator                 
                compressed: true                   
                table:                             
                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat 
                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat 
                    serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe 
                                                   
  Stage: Stage-2                                   
    Map Reduce                                     
      Map Operator Tree:                           
          TableScan                                
            Reduce Output Operator                 
              sort order:                          
              Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE 
              value expressions: _col0 (type: bigint), _col1 (type: bigint) 
      Reduce Operator Tree:                        
        Group By Operator                          
          aggregations: count(VALUE._col0), sum(VALUE._col1) 
          mode: mergepartial                       
          outputColumnNames: _col0, _col1          
          Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE 
          File Output Operator                     
            compressed: true                       
            Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE 
            table:                                 
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat 
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat 
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe 
                                                   
  Stage: Stage-0                                   
    Fetch Operator                                 
      limit: -1                                    
      Processor Tree:                              
        ListSink      



总结

  1. 对于spark来说,无论是原始sql和优化后的sql,其spark内部会做相应的优化策略,改成和优化后的sql一样
  2. 对于hive来说,其优化sql有group by操作能进行多个reduce进行处理,然后进行sum操作,做到“分而治之”的思想,先局部聚合和在全局汇总
  3. 对于hive来说,其原始sql没有group by操作,无论怎么设置参数mapred.reduce.tasks、hive.exec.reducers.bytes.per.reducer,都只有一个reduce,造成聚合数据效率特别差



版权声明:本文为weixin_39478115原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。