原始sql
explain
select
count(distinct user_id)
,count(1)
from
test
where dt = '2020-10-01';
优化sql
explain
select
count(1)test
,sum(ucnt)
from (
select
user_id
,count(1) as ucnt
from test
where dt = '2020-10-01'
group by user_id
) t;
spark执行计划
原始sql:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1), count(distinct user_id#2987358L)])
+- Exchange SinglePartition, true, [id=#8407251]
+- HashAggregate(keys=[], functions=[merge_count(1), partial_count(distinct user_id#2987358L)])
+- HashAggregate(keys=[user_id#2987358L], functions=[merge_count(1)])
+- Exchange hashpartitioning(user_id#2987358L, 200), true, [id=#8407247]
+- HashAggregate(keys=[user_id#2987358L], functions=[partial_count(1)])
+- Scan hive tmp.test[user_id#2987358L], HiveTableRelation `tmp`.`test`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [user_id#2987358L, ... 244 more fields], [dt#2987626], Stream(CatalogPartition(
Partition Values: [dt=2020-10-01]
Location: hdfs://test/user/hive/warehouse/tmp.db/test/dt=2020-10-01
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
优化sql:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1), sum(ucnt#2988041L)])
+- Exchange SinglePartition, true, [id=#8408064]
+- HashAggregate(keys=[], functions=[partial_count(1), partial_sum(ucnt#2988041L)])
+- HashAggregate(keys=[user_id#2988049L], functions=[count(1)])
+- Exchange hashpartitioning(user_id#2988049L, 200), true, [id=#8408060]
+- HashAggregate(keys=[user_id#2988049L], functions=[partial_count(1)])
+- Scan hive tmp.test[user_id#2988049L], HiveTableRelation `tmp`.`test`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [user_id#2988049L, ... 244 more fields], [dt#2988317], Stream(CatalogPartition(
Partition Values: [dt=2020-10-01]
Location: hdfs://test/user/hive/warehouse/tmp.db/test/dt=2020-10-01
Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Storage Properties: [serialization.format=1]
hive执行计划
原始sql:
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: test
filterExpr: (dt = '2020-10-01') (type: boolean)
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: user_id (type: bigint)
outputColumnNames: user_id
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(DISTINCT user_id), count(1)
keys: user_id (type: bigint)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order: +
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
value expressions: _col2 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col0:0._col0), count(VALUE._col1)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: true
Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
优化sql:
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: test
filterExpr: (dt = '2020-10-01') (type: boolean)
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: user_id (type: bigint)
outputColumnNames: user_id
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(1)
keys: user_id (type: bigint)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order: +
Map-reduce partition columns: _col0 (type: bigint)
Statistics: Num rows: 117293697 Data size: 35083585688 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: bigint)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 58646848 Data size: 17541792694 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col1 (type: bigint)
outputColumnNames: _col1
Statistics: Num rows: 58646848 Data size: 17541792694 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(1), sum(_col1)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: true
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint), _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0), sum(VALUE._col1)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: true
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
总结
- 对于spark来说,无论是原始sql和优化后的sql,其spark内部会做相应的优化策略,改成和优化后的sql一样
- 对于hive来说,其优化sql有group by操作能进行多个reduce进行处理,然后进行sum操作,做到“分而治之”的思想,先局部聚合和在全局汇总
- 对于hive来说,其原始sql没有group by操作,无论怎么设置参数mapred.reduce.tasks、hive.exec.reducers.bytes.per.reducer,都只有一个reduce,造成聚合数据效率特别差
版权声明:本文为weixin_39478115原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。