Hive – Group By 的实现 explain分析

  • Post author:
  • Post category:其他




准备数据



  1. SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;



  1. hive> SELECT * FROM logs;


  2. a   苹果

    5




  3. a   橙子

    3




  4. a      苹果

    2




  5. b   烧鸡

    1





  6. hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;

  7. a

    10




  8. b

    1





计算过程


hive-groupby-cal


默认设置了hive.map.aggr=true,所以会在mapper端先group by一次,最后再把结果merge起来,为了减少reducer处理的数据量。注意看explain的mode是不一样的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那将groupby放到reducer才做,他的mode是complete.



Operator


hive-groupby-op



Explain



  1. hive> explain SELECT uid, sum(count) FROM logs group by uid;


  2. OK

  3. ABSTRACT SYNTAX TREE:

  4. (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid))))


  5. STAGE DEPENDENCIES:

  6. Stage-

    1


    is a root stage


  7. Stage-

    0


    is a root stage



  8. STAGE PLANS:

  9. Stage: Stage-

    1




  10. Map Reduce

  11. Alias -> Map Operator Tree:

  12. logs

  13. TableScan

    // 扫描表




  14. alias: logs

  15. Select Operator

    //选择字段




  16. expressions:

  17. expr: uid

  18. type: string

  19. expr: count

  20. type:

    int




  21. outputColumnNames: uid, count

  22. Group By Operator

    //这里是因为默认设置了hive.map.aggr=true,会在mapper先做一次聚合,减少reduce需要处理的数据




  23. aggregations:

  24. expr: sum(count)

    //聚集函数




  25. bucketGroup:

    false




  26. keys:

    //键




  27. expr: uid

  28. type: string

  29. mode: hash

    //hash方式,processHashAggr()




  30. outputColumnNames: _col0, _col1

  31. Reduce Output Operator

    //输出key,value给reducer




  32. key expressions:

  33. expr: _col0

  34. type: string

  35. sort order: +

  36. Map-reduce partition columns:

  37. expr: _col0

  38. type: string

  39. tag: –

    1




  40. value expressions:

  41. expr: _col1

  42. type: bigint

  43. Reduce Operator Tree:

  44. Group By Operator


  45. aggregations:

  46. expr: sum(VALUE._col0)


  47. //聚合




  48. bucketGroup:

    false




  49. keys:

  50. expr: KEY._col0

  51. type: string

  52. mode: mergepartial

    //合并值




  53. outputColumnNames: _col0, _col1

  54. Select Operator

    //选择字段




  55. expressions:

  56. expr: _col0

  57. type: string

  58. expr: _col1

  59. type: bigint

  60. outputColumnNames: _col0, _col1

  61. File Output Operator

    //输出到文件




  62. compressed:

    false




  63. GlobalTableId:

    0




  64. table:

  65. input format: org.apache.hadoop.mapred.TextInputFormat

  66. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  67. Stage: Stage-

    0




  68. Fetch Operator

  69. limit: –

    1





  70. hive> select distinct value from src;

  71. hive> select max(key) from src;


  72. 因为没有grouping keys,所以只有一个reducer。







  73. 2.2


    如果有聚合函数或者groupby,做如下处理:



  74. 插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化



  75. 2.2


    .


    1


    hive.map.aggr为


    true


    ,默认是


    true


    ,开启的,在map端做部分聚合




  76. 2.2


    .


    1.1


    hive.groupby.skewindata为


    false


    ,默认是关闭的,groupby的数据没有倾斜。



  77. 生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。


  78. GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作



  79. 2.2


    .


    1.2


    hive.groupby.skewindata为


    true





  80. 生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator


  81. GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)


  82. GroupbyOperator(第一个MapredTask的reduce阶段)


  83. ReduceSinkOperator (第二个MapredTask的map阶段)


  84. GroupByOperator(第二个MapredTask的reduce阶段)



  85. 2.2


    .


    2


    hive.map.aggr为


    false






  86. 2.2


    .


    2.1


    hive.groupby.skewindata为


    true





  87. 生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator


  88. ReduceSinkOperator(第一个MapredTask的map阶段)


  89. GroupbyOperator(第一个MapredTask的reduce阶段)


  90. ReduceSinkOperator (第二个MapredTask的map阶段)


  91. GroupByOperator(第二个MapredTask的reduce阶段)



  92. 2.2


    .


    2.2


    hive.groupby.skewindata为


    false





  93. 生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)








  94. 第一种情况:


  95. set hive.map.aggr=

    false


    ;



  96. set hive.groupby.skewindata=

    false


    ;



  97. SemanticAnalyzer.genGroupByPlan1MR(){




  98. 1


    )ReduceSinkOperator: It will put all Group By keys and the


  99. distinct field (

    if


    any) in the map-reduce sort key, and all other fields


  100. in the map-reduce value.




  101. 2


    )GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)



  102. }




  103. 第二种情况:


  104. set hive.map.aggr=

    true


    ;



  105. set hive.groupby.skewindata=

    false


    ;



  106. SemanticAnalyzer.genGroupByPlanMapAggr1MR(){




  107. 1


    )GroupByOperator:GroupByDesc.Mode.HASH,The agggregation


  108. evaluation functions are as follows: Mapper: iterate/terminatePartial

  109. (mode = HASH)




  110. 2


    )ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key:


  111. grouping key

    if


    no DISTINCT grouping + distinct key


    if


    DISTINCT





  112. 3


    )GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer:


  113. iterate/terminate

    if


    DISTINCT merge/terminate


    if


    NO DISTINCT (mode =


  114. MERGEPARTIAL)


  115. }




  116. 第三种情况:


  117. set hive.map.aggr=

    false


    ;



  118. set hive.groupby.skewindata=

    true


    ;



  119. SemanticAnalyzer.genGroupByPlan2MR(){




  120. 1


    )ReduceSinkOperator:Partitioning Key: random()


    if


    no DISTINCT


  121. grouping + distinct key

    if


    DISTINCT。Sorting Key: grouping key


    if


    no


  122. DISTINCT grouping + distinct key

    if


    DISTINCT





  123. 2


    )GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)





  124. 3


    )ReduceSinkOperator:Partitioning Key: grouping key。Sorting


  125. Key: grouping key

    if


    no DISTINCT grouping + distinct key


    if


    DISTINCT





  126. 4


    )GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)



  127. }




  128. 第四种情况:


  129. set hive.map.aggr=

    true


    ;



  130. set hive.groupby.skewindata=

    true


    ;



  131. SemanticAnalyzer.genGroupByPlanMapAggr2MR(){




  132. 1


    )GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)





  133. 2


    )ReduceSinkOperator: Partitioning Key: random()


    if


    no


  134. DISTINCT grouping + distinct key

    if


    DISTINCT。 Sorting Key: grouping key



  135. if


    no DISTINCT grouping + distinct key


    if


    DISTINCT。





  136. 3


    )GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer:


  137. iterate/terminatePartial

    if


    DISTINCT merge/terminatePartial


    if


    NO


  138. DISTINCT (mode = MERGEPARTIAL)




  139. 4


    )ReduceSinkOperator:Partitioining Key: grouping key。Sorting


  140. Key: grouping key

    if


    no DISTINCT grouping + distinct key


    if


    DISTINCT





  141. 5


    )GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)



  142. }






  143. ReduceSinkOperator的processOp(Object row,

    int




  144. tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key:

  145. random()

    if


    no DISTINCT grouping + distinct key


    if




  146. DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机

  147. 数,random =

    new


    Random(


    12345


    );。如果有DISTINCT字段,那么key的hash值跟grouping +


  148. distinct key有关。








  149. GroupByOperator:


  150. initializeOp(Configuration hconf)


  151. processOp(Object row,

    int


    tag)



  152. closeOp(

    boolean


    abort)



  153. forward(ArrayList<Object> keys, AggregationBuffer[] aggs)






  154. groupby10.q   groupby11.q


  155. set hive.map.aggr=

    false


    ;



  156. set hive.groupby.skewindata=

    false


    ;





  157. EXPLAIN


  158. FROM INPUT


  159. INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,

  160. count(substr(INPUT.value,

    5


    )), count(distinct substr(INPUT.value,


    5


    ))


  161. GROUP BY INPUT.key;




  162. STAGE DEPENDENCIES:


  163. Stage-

    1


    is a root stage



  164. Stage-

    0


    depends on stages: Stage-


    1







  165. STAGE PLANS:


  166. Stage: Stage-

    1





  167. Map Reduce


  168. Alias -> Map Operator Tree:


  169. input


  170. TableScan


  171. alias: input


  172. Select Operator

    // insertSelectAllPlanForGroupBy





  173. expressions:


  174. expr: key


  175. type:

    int





  176. expr: value


  177. type: string


  178. outputColumnNames: key, value


  179. Reduce Output Operator


  180. key expressions:


  181. expr: key


  182. type:

    int





  183. expr: substr(value,

    5


    )



  184. type: string


  185. sort order: ++


  186. Map-reduce partition columns:


  187. expr: key


  188. type:

    int





  189. tag: –

    1





  190. Reduce Operator Tree:


  191. Group By Operator


  192. aggregations:


  193. expr: count(KEY._col1:

    0


    ._col0)



  194. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  195. bucketGroup:

    false





  196. keys:


  197. expr: KEY._col0


  198. type:

    int





  199. mode: complete


  200. outputColumnNames: _col0, _col1, _col2


  201. Select Operator


  202. expressions:


  203. expr: _col0


  204. type:

    int





  205. expr: _col1


  206. type: bigint


  207. expr: _col2


  208. type: bigint


  209. outputColumnNames: _col0, _col1, _col2


  210. Select Operator


  211. expressions:


  212. expr: _col0


  213. type:

    int





  214. expr: UDFToInteger(_col1)


  215. type:

    int





  216. expr: UDFToInteger(_col2)


  217. type:

    int





  218. outputColumnNames: _col0, _col1, _col2


  219. File Output Operator


  220. compressed:

    false





  221. GlobalTableId:

    1





  222. table:


  223. input format: org.apache.hadoop.mapred.TextInputFormat


  224. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  225. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  226. name: dest1




  227. Stage: Stage-

    0





  228. Move Operator


  229. tables:


  230. replace:

    true





  231. table:


  232. input format: org.apache.hadoop.mapred.TextInputFormat


  233. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  234. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  235. name: dest1










  236. set hive.map.aggr=

    true


    ;



  237. set hive.groupby.skewindata=

    false


    ;





  238. STAGE DEPENDENCIES:


  239. Stage-

    1


    is a root stage



  240. Stage-

    0


    depends on stages: Stage-


    1







  241. STAGE PLANS:


  242. Stage: Stage-

    1





  243. Map Reduce


  244. Alias -> Map Operator Tree:


  245. input


  246. TableScan


  247. alias: input


  248. Select Operator


  249. expressions:


  250. expr: key


  251. type:

    int





  252. expr: value


  253. type: string


  254. outputColumnNames: key, value


  255. Group By Operator


  256. aggregations:


  257. expr: count(substr(value,

    5


    ))



  258. expr: count(DISTINCT substr(value,

    5


    ))



  259. bucketGroup:

    false





  260. keys:


  261. expr: key


  262. type:

    int





  263. expr: substr(value,

    5


    )



  264. type: string


  265. mode: hash


  266. outputColumnNames: _col0, _col1, _col2, _col3


  267. Reduce Output Operator


  268. key expressions:


  269. expr: _col0


  270. type:

    int





  271. expr: _col1


  272. type: string


  273. sort order: ++


  274. Map-reduce partition columns:


  275. expr: _col0


  276. type:

    int





  277. tag: –

    1





  278. value expressions:


  279. expr: _col2


  280. type: bigint


  281. expr: _col3


  282. type: bigint


  283. Reduce Operator Tree:


  284. Group By Operator


  285. aggregations:


  286. expr: count(VALUE._col0)


  287. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  288. bucketGroup:

    false





  289. keys:


  290. expr: KEY._col0


  291. type:

    int





  292. mode: mergepartial


  293. outputColumnNames: _col0, _col1, _col2


  294. Select Operator


  295. expressions:


  296. expr: _col0


  297. type:

    int





  298. expr: _col1


  299. type: bigint


  300. expr: _col2


  301. type: bigint


  302. outputColumnNames: _col0, _col1, _col2


  303. Select Operator


  304. expressions:


  305. expr: _col0


  306. type:

    int





  307. expr: UDFToInteger(_col1)


  308. type:

    int





  309. expr: UDFToInteger(_col2)


  310. type:

    int





  311. outputColumnNames: _col0, _col1, _col2


  312. File Output Operator


  313. compressed:

    false





  314. GlobalTableId:

    1





  315. table:


  316. input format: org.apache.hadoop.mapred.TextInputFormat


  317. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  318. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  319. name: dest1




  320. Stage: Stage-

    0





  321. Move Operator


  322. tables:


  323. replace:

    true





  324. table:


  325. input format: org.apache.hadoop.mapred.TextInputFormat


  326. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  327. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  328. name: dest1


















  329. set hive.map.aggr=

    false


    ;



  330. set hive.groupby.skewindata=

    true


    ;





  331. STAGE DEPENDENCIES:


  332. Stage-

    1


    is a root stage



  333. Stage-

    2


    depends on stages: Stage-


    1





  334. Stage-

    0


    depends on stages: Stage-


    2







  335. STAGE PLANS:


  336. Stage: Stage-

    1





  337. Map Reduce


  338. Alias -> Map Operator Tree:


  339. input


  340. TableScan


  341. alias: input


  342. Select Operator


  343. expressions:


  344. expr: key


  345. type:

    int





  346. expr: value


  347. type: string


  348. outputColumnNames: key, value


  349. Reduce Output Operator


  350. key expressions:


  351. expr: key


  352. type:

    int





  353. expr: substr(value,

    5


    )



  354. type: string


  355. sort order: ++


  356. Map-reduce partition columns:


  357. expr: key


  358. type:

    int





  359. tag: –

    1





  360. Reduce Operator Tree:


  361. Group By Operator


  362. aggregations:


  363. expr: count(KEY._col1:

    0


    ._col0)



  364. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  365. bucketGroup:

    false





  366. keys:


  367. expr: KEY._col0


  368. type:

    int





  369. mode: partial1


  370. outputColumnNames: _col0, _col1, _col2


  371. File Output Operator


  372. compressed:

    false





  373. GlobalTableId:

    0





  374. table:


  375. input format: org.apache.hadoop.mapred.SequenceFileInputFormat


  376. output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat




  377. Stage: Stage-

    2





  378. Map Reduce


  379. Alias -> Map Operator Tree:


  380. hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-48-26_387_7978992474997402829/-mr-10002





  381. Reduce Output Operator


  382. key expressions:


  383. expr: _col0


  384. type:

    int





  385. sort order: +


  386. Map-reduce partition columns:


  387. expr: _col0


  388. type:

    int





  389. tag: –

    1





  390. value expressions:


  391. expr: _col1


  392. type: bigint


  393. expr: _col2


  394. type: bigint


  395. Reduce Operator Tree:


  396. Group By Operator


  397. aggregations:


  398. expr: count(VALUE._col0)


  399. expr: count(VALUE._col1)


  400. bucketGroup:

    false





  401. keys:


  402. expr: KEY._col0


  403. type:

    int





  404. mode:

    final





  405. outputColumnNames: _col0, _col1, _col2


  406. Select Operator


  407. expressions:


  408. expr: _col0


  409. type:

    int





  410. expr: _col1


  411. type: bigint


  412. expr: _col2


  413. type: bigint


  414. outputColumnNames: _col0, _col1, _col2


  415. Select Operator


  416. expressions:


  417. expr: _col0


  418. type:

    int





  419. expr: UDFToInteger(_col1)


  420. type:

    int





  421. expr: UDFToInteger(_col2)


  422. type:

    int





  423. outputColumnNames: _col0, _col1, _col2


  424. File Output Operator


  425. compressed:

    false





  426. GlobalTableId:

    1





  427. table:


  428. input format: org.apache.hadoop.mapred.TextInputFormat


  429. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  430. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  431. name: dest1




  432. Stage: Stage-

    0





  433. Move Operator


  434. tables:


  435. replace:

    true





  436. table:


  437. input format: org.apache.hadoop.mapred.TextInputFormat


  438. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  439. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  440. name: dest1










  441. set hive.map.aggr=

    true


    ;



  442. set hive.groupby.skewindata=

    true


    ;





  443. STAGE DEPENDENCIES:


  444. Stage-

    1


    is a root stage



  445. Stage-

    2


    depends on stages: Stage-


    1





  446. Stage-

    0


    depends on stages: Stage-


    2







  447. STAGE PLANS:


  448. Stage: Stage-

    1





  449. Map Reduce


  450. Alias -> Map Operator Tree:


  451. input


  452. TableScan


  453. alias: input


  454. Select Operator


  455. expressions:


  456. expr: key


  457. type:

    int





  458. expr: value


  459. type: string


  460. outputColumnNames: key, value


  461. Group By Operator


  462. aggregations:


  463. expr: count(substr(value,

    5


    ))



  464. expr: count(DISTINCT substr(value,

    5


    ))



  465. bucketGroup:

    false





  466. keys:


  467. expr: key


  468. type:

    int





  469. expr: substr(value,

    5


    )



  470. type: string


  471. mode: hash


  472. outputColumnNames: _col0, _col1, _col2, _col3


  473. Reduce Output Operator


  474. key expressions:


  475. expr: _col0


  476. type:

    int





  477. expr: _col1


  478. type: string


  479. sort order: ++


  480. Map-reduce partition columns:


  481. expr: _col0


  482. type:

    int





  483. tag: –

    1





  484. value expressions:


  485. expr: _col2


  486. type: bigint


  487. expr: _col3


  488. type: bigint


  489. Reduce Operator Tree:


  490. Group By Operator


  491. aggregations:


  492. expr: count(VALUE._col0)


  493. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  494. bucketGroup:

    false





  495. keys:


  496. expr: KEY._col0


  497. type:

    int





  498. mode: partials


  499. outputColumnNames: _col0, _col1, _col2


  500. File Output Operator


  501. compressed:

    false





  502. GlobalTableId:

    0





  503. table:


  504. input format: org.apache.hadoop.mapred.SequenceFileInputFormat


  505. output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat




  506. Stage: Stage-

    2





  507. Map Reduce


  508. Alias -> Map Operator Tree:


  509. hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-49-25_899_4946067838822964610/-mr-10002





  510. Reduce Output Operator


  511. key expressions:


  512. expr: _col0


  513. type:

    int





  514. sort order: +


  515. Map-reduce partition columns:


  516. expr: _col0


  517. type:

    int





  518. tag: –

    1





  519. value expressions:


  520. expr: _col1


  521. type: bigint


  522. expr: _col2


  523. type: bigint


  524. Reduce Operator Tree:


  525. Group By Operator


  526. aggregations:


  527. expr: count(VALUE._col0)


  528. expr: count(VALUE._col1)


  529. bucketGroup:

    false





  530. keys:


  531. expr: KEY._col0


  532. type:

    int





  533. mode:

    final





  534. outputColumnNames: _col0, _col1, _col2


  535. Select Operator


  536. expressions:


  537. expr: _col0


  538. type:

    int





  539. expr: _col1


  540. type: bigint


  541. expr: _col2


  542. type: bigint


  543. outputColumnNames: _col0, _col1, _col2


  544. Select Operator


  545. expressions:


  546. expr: _col0


  547. type:

    int





  548. expr: UDFToInteger(_col1)


  549. type:

    int





  550. expr: UDFToInteger(_col2)


  551. type:

    int





  552. outputColumnNames: _col0, _col1, _col2


  553. File Output Operator


  554. compressed:

    false





  555. GlobalTableId:

    1





  556. table:


  557. input format: org.apache.hadoop.mapred.TextInputFormat


  558. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  559. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  560. name: dest1




  561. Stage: Stage-

    0





  562. Move Operator


  563. tables:


  564. replace:

    true





  565. table:


  566. input format: org.apache.hadoop.mapred.TextInputFormat


  567. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  568. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  569. name: dest1


















  570. set hive.map.aggr=

    false


    ;



  571. set hive.groupby.skewindata=

    false


    ;





  572. EXPLAIN extended


  573. FROM INPUT


  574. INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,

  575. count(substr(INPUT.value,

    5


    )), count(distinct substr(INPUT.value,


    5


    ))


  576. GROUP BY INPUT.key;




  577. STAGE DEPENDENCIES:


  578. Stage-

    1


    is a root stage



  579. Stage-

    0


    depends on stages: Stage-


    1







  580. STAGE PLANS:


  581. Stage: Stage-

    1





  582. Map Reduce


  583. Alias -> Map Operator Tree:


  584. input


  585. TableScan


  586. alias: input


  587. Select Operator


  588. expressions:


  589. expr: key


  590. type:

    int





  591. expr: value


  592. type: string


  593. outputColumnNames: key, value


  594. Reduce Output Operator


  595. key expressions:


  596. expr: key


  597. type:

    int





  598. expr: substr(value,

    5


    )



  599. type: string


  600. sort order: ++


  601. Map-reduce partition columns:


  602. expr: key


  603. type:

    int





  604. tag: –

    1





  605. Needs Tagging:

    false





  606. Path -> Alias:


  607. hdfs:

    //localhost:54310/user/hive/warehouse/input [input]





  608. Path -> Partition:


  609. hdfs:

    //localhost:54310/user/hive/warehouse/input





  610. Partition


  611. base file name: input


  612. input format: org.apache.hadoop.mapred.TextInputFormat


  613. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  614. properties:


  615. bucket_count –

    1





  616. columns key,value


  617. columns.types

    int


    :string



  618. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  619. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  620. location hdfs:

    //localhost:54310/user/hive/warehouse/input





  621. name input


  622. serialization.ddl struct input { i32 key, string value}


  623. serialization.format

    1





  624. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  625. transient_lastDdlTime

    1310523947





  626. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe




  627. input format: org.apache.hadoop.mapred.TextInputFormat


  628. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  629. properties:


  630. bucket_count –

    1





  631. columns key,value


  632. columns.types

    int


    :string



  633. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  634. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  635. location hdfs:

    //localhost:54310/user/hive/warehouse/input





  636. name input


  637. serialization.ddl struct input { i32 key, string value}


  638. serialization.format

    1





  639. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  640. transient_lastDdlTime

    1310523947





  641. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  642. name: input


  643. name: input


  644. Reduce Operator Tree:


  645. Group By Operator


  646. aggregations:


  647. expr: count(KEY._col1:

    0


    ._col0)



  648. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  649. bucketGroup:

    false





  650. keys:


  651. expr: KEY._col0


  652. type:

    int





  653. mode: complete


  654. outputColumnNames: _col0, _col1, _col2


  655. Select Operator


  656. expressions:


  657. expr: _col0


  658. type:

    int





  659. expr: _col1


  660. type: bigint


  661. expr: _col2


  662. type: bigint


  663. outputColumnNames: _col0, _col1, _col2


  664. Select Operator


  665. expressions:


  666. expr: _col0


  667. type:

    int





  668. expr: UDFToInteger(_col1)


  669. type:

    int





  670. expr: UDFToInteger(_col2)


  671. type:

    int





  672. outputColumnNames: _col0, _col1, _col2


  673. File Output Operator


  674. compressed:

    false





  675. GlobalTableId:

    1





  676. directory: hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000





  677. NumFilesPerFileSink:

    1





  678. table:


  679. input format: org.apache.hadoop.mapred.TextInputFormat


  680. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  681. properties:


  682. bucket_count –

    1





  683. columns key,val1,val2


  684. columns.types

    int


    :


    int


    :


    int





  685. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  686. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  687. location hdfs:

    //localhost:54310/user/hive/warehouse/dest1





  688. name dest1


  689. serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}


  690. serialization.format

    1





  691. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  692. transient_lastDdlTime

    1310523946





  693. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  694. name: dest1


  695. TotalFiles:

    1





  696. MultiFileSpray:

    false







  697. Stage: Stage-

    0





  698. Move Operator


  699. tables:


  700. replace:

    true





  701. source: hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000





  702. table:


  703. input format: org.apache.hadoop.mapred.TextInputFormat


  704. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  705. properties:


  706. bucket_count –

    1





  707. columns key,val1,val2


  708. columns.types

    int


    :


    int


    :


    int





  709. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  710. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  711. location hdfs:

    //localhost:54310/user/hive/warehouse/dest1





  712. name dest1


  713. serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}


  714. serialization.format

    1





  715. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  716. transient_lastDdlTime

    1310523946





  717. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  718. name: dest1


  719. tmp directory: hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10001











  720. ABSTRACT SYNTAX TREE:


  721. (TOK_QUERY


  722. (TOK_FROM (TOK_TABREF INPUT))


  723. (TOK_INSERT


  724. (TOK_DESTINATION (TOK_TAB dest1))


  725. (TOK_SELECT


  726. (TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))


  727. (TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value)

    5


    )))



  728. (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value)

    5


    )))



  729. )


  730. (TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))


  731. )


  732. )








  733. SemanticAnalyzer.genBodyPlan(QB qb, Operator input){



  734. if


    (qbp.getAggregationExprsForClause(dest).size() !=


    0





  735. || getGroupByForClause(qbp, dest).size() >

    0


    ) {


    //如果有聚合函数或者有groupby,则执行下面的操作






  736. //multiple distincts is not supported with skew in data






  737. if


    (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)



  738. .equalsIgnoreCase(

    “true”


    ) &&



  739. qbp.getDistinctFuncExprsForClause(dest).size() >

    1


    ) {




  740. throw




    new


    SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.



  741. getMsg());


  742. }



  743. // insert a select operator here used by the ColumnPruner to reduce






  744. // the data to shuffle





  745. curr = insertSelectAllPlanForGroupBy(dest, curr);

    //生成一个SelectOperator,所有的字段都会选取,selectStar=true。






  746. if


    (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)



  747. .equalsIgnoreCase(

    “true”


    )) {




  748. if


    (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)



  749. .equalsIgnoreCase(

    “false”


    )) {



  750. curr = genGroupByPlanMapAggr1MR(dest, qb, curr);


  751. }

    else


    {



  752. curr = genGroupByPlanMapAggr2MR(dest, qb, curr);


  753. }


  754. }

    else




    if


    (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)



  755. .equalsIgnoreCase(

    “true”


    )) {



  756. curr = genGroupByPlan2MR(dest, qb, curr);


  757. }

    else


    {



  758. curr = genGroupByPlan1MR(dest, qb, curr);


  759. }


  760. }


  761. }




  762. distince:


  763. count.q.out


  764. groupby11.q.out


  765. groupby10.q.out


  766. nullgroup4_multi_distinct.q.out


  767. join18.q.out


  768. groupby_bigdata.q.out


  769. join18_multi_distinct.q.out


  770. nullgroup4.q.out


  771. auto_join18_multi_distinct.q.out


  772. auto_join18.q.out






  773. 1


    )map端部分聚合,数据无倾斜,一个MR生成。



  774. genGroupByPlanMapAggr1MR,生成三个Operator:




  775. 1.1


    )GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:



  776. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  777. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames


  778. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  779. public


    GroupByDesc(




  780. final


    Mode mode,




  781. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  782. final


    java.util.ArrayList<ExprNodeDesc> keys,




  783. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  784. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  785. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  786. false


    , groupByMemoryUsage, memoryThreshold);



  787. }


  788. mode:GroupByDesc.Mode.HASH


  789. outputColumnNames:groupby+Distinct+Aggregation


  790. keys:groupby+Distinct


  791. aggregators:Aggregation


  792. groupKeyNotReductionKey:

    false





  793. groupByMemoryUsage:默认为

    0.5





  794. memoryThreshold:默认为

    0.9









  795. 1.2


    )ReduceSinkOperator



  796. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  797. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  798. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  799. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  800. int


    numDistributionKeys,



  801. java.util.ArrayList<ExprNodeDesc> valueCols,


  802. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  803. List<List<Integer>> distinctColumnIndices,


  804. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  805. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  806. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  807. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  808. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  809. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  810. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  811. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  812. this


    .tag = tag;


    // -1






  813. this


    .numReducers = numReducers;


    // 一般都是-1






  814. this


    .partitionCols = partitionCols;


    // groupby






  815. this


    .keySerializeInfo = keySerializeInfo;




  816. this


    .valueSerializeInfo = valueSerializeInfo;




  817. this


    .distinctColumnIndices = distinctColumnIndices;



  818. }






  819. 1.3


    )GroupByOperator



  820. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  821. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  822. public


    GroupByDesc(




  823. final


    Mode mode,




  824. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  825. final


    java.util.ArrayList<ExprNodeDesc> keys,




  826. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  827. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  828. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  829. false


    , groupByMemoryUsage, memoryThreshold);



  830. }


  831. mode:GroupByDesc.Mode.MERGEPARTIAL


  832. outputColumnNames:groupby+Aggregation


  833. keys:groupby


  834. aggregators:Aggregation


  835. groupKeyNotReductionKey:

    false





  836. groupByMemoryUsage:默认为

    0.5





  837. memoryThreshold:默认为

    0.9









  838. 2


    )map端部分聚合,数据倾斜,两个MR生成。



  839. genGroupByPlanMapAggr2MR:




  840. 2.1


    )GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:



  841. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  842. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames


  843. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  844. public


    GroupByDesc(




  845. final


    Mode mode,




  846. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  847. final


    java.util.ArrayList<ExprNodeDesc> keys,




  848. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  849. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  850. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  851. false


    , groupByMemoryUsage, memoryThreshold);



  852. }


  853. mode:GroupByDesc.Mode.HASH


  854. outputColumnNames:groupby+Distinct+Aggregation


  855. keys:groupby+Distinct


  856. aggregators:Aggregation


  857. groupKeyNotReductionKey:

    false





  858. groupByMemoryUsage:默认为

    0.5





  859. memoryThreshold:默认为

    0.9









  860. 2.2


    )ReduceSinkOperator



  861. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  862. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  863. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  864. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  865. int


    numDistributionKeys,



  866. java.util.ArrayList<ExprNodeDesc> valueCols,


  867. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  868. List<List<Integer>> distinctColumnIndices,


  869. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  870. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  871. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  872. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  873. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  874. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  875. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  876. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  877. this


    .tag = tag;


    // -1






  878. this


    .numReducers = numReducers;


    // 一般都是-1






  879. this


    .partitionCols = partitionCols;


    // groupby






  880. this


    .keySerializeInfo = keySerializeInfo;




  881. this


    .valueSerializeInfo = valueSerializeInfo;




  882. this


    .distinctColumnIndices = distinctColumnIndices;



  883. }






  884. 2.3


    )GroupByOperator



  885. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  886. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  887. public


    GroupByDesc(




  888. final


    Mode mode,




  889. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  890. final


    java.util.ArrayList<ExprNodeDesc> keys,




  891. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  892. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  893. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  894. false


    , groupByMemoryUsage, memoryThreshold);



  895. }


  896. mode:GroupByDesc.Mode.PARTIALS


  897. outputColumnNames:groupby+Aggregation


  898. keys:groupby


  899. aggregators:Aggregation


  900. groupKeyNotReductionKey:

    false





  901. groupByMemoryUsage:默认为

    0.5





  902. memoryThreshold:默认为

    0.9









  903. 2.4


    )ReduceSinkOperator



  904. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames


  905. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames



  906. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  907. int


    numDistributionKeys,



  908. java.util.ArrayList<ExprNodeDesc> valueCols,


  909. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  910. List<List<Integer>> distinctColumnIndices,


  911. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  912. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  913. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  914. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby






  915. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  916. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  917. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  918. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  919. this


    .tag = tag;


    // -1






  920. this


    .numReducers = numReducers;


    // 一般都是-1






  921. this


    .partitionCols = partitionCols;


    // groupby






  922. this


    .keySerializeInfo = keySerializeInfo;




  923. this


    .valueSerializeInfo = valueSerializeInfo;




  924. this


    .distinctColumnIndices = distinctColumnIndices;



  925. }






  926. 2.5


    )GroupByOperator



  927. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  928. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames



  929. public


    GroupByDesc(




  930. final


    Mode mode,




  931. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  932. final


    java.util.ArrayList<ExprNodeDesc> keys,




  933. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  934. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  935. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  936. false


    , groupByMemoryUsage, memoryThreshold);



  937. }


  938. mode:GroupByDesc.Mode.FINAL


  939. outputColumnNames:groupby+Aggregation


  940. keys:groupby


  941. aggregators:Aggregation


  942. groupKeyNotReductionKey:

    false





  943. groupByMemoryUsage:默认为

    0.5





  944. memoryThreshold:默认为

    0.9











  945. 3


    )map端不部分聚合,数据倾斜,两个MR生成。



  946. genGroupByPlan2MR:






  947. 3.1


    )ReduceSinkOperator



  948. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  949. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  950. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  951. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  952. int


    numDistributionKeys,



  953. java.util.ArrayList<ExprNodeDesc> valueCols,


  954. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  955. List<List<Integer>> distinctColumnIndices,


  956. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  957. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  958. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  959. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  960. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  961. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  962. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  963. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  964. this


    .tag = tag;


    // -1






  965. this


    .numReducers = numReducers;


    // 一般都是-1






  966. this


    .partitionCols = partitionCols;


    // groupby






  967. this


    .keySerializeInfo = keySerializeInfo;




  968. this


    .valueSerializeInfo = valueSerializeInfo;




  969. this


    .distinctColumnIndices = distinctColumnIndices;



  970. }






  971. 3.2


    )GroupByOperator



  972. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  973. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  974. public


    GroupByDesc(




  975. final


    Mode mode,




  976. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  977. final


    java.util.ArrayList<ExprNodeDesc> keys,




  978. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  979. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  980. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  981. false


    , groupByMemoryUsage, memoryThreshold);



  982. }


  983. mode:GroupByDesc.Mode.PARTIAL1


  984. outputColumnNames:groupby+Aggregation


  985. keys:groupby


  986. aggregators:Aggregation


  987. groupKeyNotReductionKey:

    false





  988. groupByMemoryUsage:默认为

    0.5





  989. memoryThreshold:默认为

    0.9









  990. 3.3


    )ReduceSinkOperator



  991. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames


  992. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames



  993. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  994. int


    numDistributionKeys,



  995. java.util.ArrayList<ExprNodeDesc> valueCols,


  996. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  997. List<List<Integer>> distinctColumnIndices,


  998. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  999. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  1000. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  1001. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby






  1002. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  1003. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  1004. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  1005. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  1006. this


    .tag = tag;


    // -1






  1007. this


    .numReducers = numReducers;


    // 一般都是-1






  1008. this


    .partitionCols = partitionCols;


    // groupby






  1009. this


    .keySerializeInfo = keySerializeInfo;




  1010. this


    .valueSerializeInfo = valueSerializeInfo;




  1011. this


    .distinctColumnIndices = distinctColumnIndices;



  1012. }






  1013. 3.4


    )GroupByOperator



  1014. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  1015. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames



  1016. public


    GroupByDesc(




  1017. final


    Mode mode,




  1018. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  1019. final


    java.util.ArrayList<ExprNodeDesc> keys,




  1020. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  1021. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  1022. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  1023. false


    , groupByMemoryUsage, memoryThreshold);



  1024. }


  1025. mode:GroupByDesc.Mode.FINAL


  1026. outputColumnNames:groupby+Aggregation


  1027. keys:groupby


  1028. aggregators:Aggregation


  1029. groupKeyNotReductionKey:

    false





  1030. groupByMemoryUsage:默认为

    0.5





  1031. memoryThreshold:默认为

    0.9











  1032. 4


    )map端不部分聚合,数据无倾斜,一个MR生成。



  1033. genGroupByPlan1MR:




  1034. 4.1


    )ReduceSinkOperator



  1035. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  1036. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  1037. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  1038. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  1039. int


    numDistributionKeys,



  1040. java.util.ArrayList<ExprNodeDesc> valueCols,


  1041. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  1042. List<List<Integer>> distinctColumnIndices,


  1043. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  1044. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  1045. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  1046. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  1047. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  1048. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  1049. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  1050. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  1051. this


    .tag = tag;


    // -1






  1052. this


    .numReducers = numReducers;


    // 一般都是-1






  1053. this


    .partitionCols = partitionCols;


    // groupby






  1054. this


    .keySerializeInfo = keySerializeInfo;




  1055. this


    .valueSerializeInfo = valueSerializeInfo;




  1056. this


    .distinctColumnIndices = distinctColumnIndices;



  1057. }






  1058. 4.2


    )GroupByOperator



  1059. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  1060. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  1061. public


    GroupByDesc(




  1062. final


    Mode mode,




  1063. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  1064. final


    java.util.ArrayList<ExprNodeDesc> keys,




  1065. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  1066. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  1067. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  1068. false


    , groupByMemoryUsage, memoryThreshold);



  1069. }


  1070. mode:GroupByDesc.Mode.COMPLETE


  1071. outputColumnNames:groupby+Aggregation


  1072. keys:groupby


  1073. aggregators:Aggregation


  1074. groupKeyNotReductionKey:

    false





  1075. groupByMemoryUsage:默认为

    0.5





  1076. memoryThreshold:默认为

    0.9











  1077. SemanticAnalyzer.genBodyPlan


  1078. optimizeMultiGroupBy  (multi-group by with the same distinct)


  1079. groupby10.q  groupby11.q



准备数据



  1. SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;



  1. hive> SELECT * FROM logs;


  2. a   苹果

    5




  3. a   橙子

    3




  4. a      苹果

    2




  5. b   烧鸡

    1





  6. hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;

  7. a

    10




  8. b

    1





计算过程


hive-groupby-cal


默认设置了hive.map.aggr=true,所以会在mapper端先group by一次,最后再把结果merge起来,为了减少reducer处理的数据量。注意看explain的mode是不一样的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那将groupby放到reducer才做,他的mode是complete.



Operator


hive-groupby-op



Explain



  1. hive> explain SELECT uid, sum(count) FROM logs group by uid;


  2. OK

  3. ABSTRACT SYNTAX TREE:

  4. (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid))))


  5. STAGE DEPENDENCIES:

  6. Stage-

    1


    is a root stage


  7. Stage-

    0


    is a root stage



  8. STAGE PLANS:

  9. Stage: Stage-

    1




  10. Map Reduce

  11. Alias -> Map Operator Tree:

  12. logs

  13. TableScan

    // 扫描表




  14. alias: logs

  15. Select Operator

    //选择字段




  16. expressions:

  17. expr: uid

  18. type: string

  19. expr: count

  20. type:

    int




  21. outputColumnNames: uid, count

  22. Group By Operator

    //这里是因为默认设置了hive.map.aggr=true,会在mapper先做一次聚合,减少reduce需要处理的数据




  23. aggregations:

  24. expr: sum(count)

    //聚集函数




  25. bucketGroup:

    false




  26. keys:

    //键




  27. expr: uid

  28. type: string

  29. mode: hash

    //hash方式,processHashAggr()




  30. outputColumnNames: _col0, _col1

  31. Reduce Output Operator

    //输出key,value给reducer




  32. key expressions:

  33. expr: _col0

  34. type: string

  35. sort order: +

  36. Map-reduce partition columns:

  37. expr: _col0

  38. type: string

  39. tag: –

    1




  40. value expressions:

  41. expr: _col1

  42. type: bigint

  43. Reduce Operator Tree:

  44. Group By Operator


  45. aggregations:

  46. expr: sum(VALUE._col0)


  47. //聚合




  48. bucketGroup:

    false




  49. keys:

  50. expr: KEY._col0

  51. type: string

  52. mode: mergepartial

    //合并值




  53. outputColumnNames: _col0, _col1

  54. Select Operator

    //选择字段




  55. expressions:

  56. expr: _col0

  57. type: string

  58. expr: _col1

  59. type: bigint

  60. outputColumnNames: _col0, _col1

  61. File Output Operator

    //输出到文件




  62. compressed:

    false




  63. GlobalTableId:

    0




  64. table:

  65. input format: org.apache.hadoop.mapred.TextInputFormat

  66. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  67. Stage: Stage-

    0




  68. Fetch Operator

  69. limit: –

    1





  70. hive> select distinct value from src;

  71. hive> select max(key) from src;


  72. 因为没有grouping keys,所以只有一个reducer。







  73. 2.2


    如果有聚合函数或者groupby,做如下处理:



  74. 插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化



  75. 2.2


    .


    1


    hive.map.aggr为


    true


    ,默认是


    true


    ,开启的,在map端做部分聚合




  76. 2.2


    .


    1.1


    hive.groupby.skewindata为


    false


    ,默认是关闭的,groupby的数据没有倾斜。



  77. 生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。


  78. GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作



  79. 2.2


    .


    1.2


    hive.groupby.skewindata为


    true





  80. 生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator


  81. GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)


  82. GroupbyOperator(第一个MapredTask的reduce阶段)


  83. ReduceSinkOperator (第二个MapredTask的map阶段)


  84. GroupByOperator(第二个MapredTask的reduce阶段)



  85. 2.2


    .


    2


    hive.map.aggr为


    false






  86. 2.2


    .


    2.1


    hive.groupby.skewindata为


    true





  87. 生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator


  88. ReduceSinkOperator(第一个MapredTask的map阶段)


  89. GroupbyOperator(第一个MapredTask的reduce阶段)


  90. ReduceSinkOperator (第二个MapredTask的map阶段)


  91. GroupByOperator(第二个MapredTask的reduce阶段)



  92. 2.2


    .


    2.2


    hive.groupby.skewindata为


    false





  93. 生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)








  94. 第一种情况:


  95. set hive.map.aggr=

    false


    ;



  96. set hive.groupby.skewindata=

    false


    ;



  97. SemanticAnalyzer.genGroupByPlan1MR(){




  98. 1


    )ReduceSinkOperator: It will put all Group By keys and the


  99. distinct field (

    if


    any) in the map-reduce sort key, and all other fields


  100. in the map-reduce value.




  101. 2


    )GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)



  102. }




  103. 第二种情况:


  104. set hive.map.aggr=

    true


    ;



  105. set hive.groupby.skewindata=

    false


    ;



  106. SemanticAnalyzer.genGroupByPlanMapAggr1MR(){




  107. 1


    )GroupByOperator:GroupByDesc.Mode.HASH,The agggregation


  108. evaluation functions are as follows: Mapper: iterate/terminatePartial

  109. (mode = HASH)




  110. 2


    )ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key:


  111. grouping key

    if


    no DISTINCT grouping + distinct key


    if


    DISTINCT





  112. 3


    )GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer:


  113. iterate/terminate

    if


    DISTINCT merge/terminate


    if


    NO DISTINCT (mode =


  114. MERGEPARTIAL)


  115. }




  116. 第三种情况:


  117. set hive.map.aggr=

    false


    ;



  118. set hive.groupby.skewindata=

    true


    ;



  119. SemanticAnalyzer.genGroupByPlan2MR(){




  120. 1


    )ReduceSinkOperator:Partitioning Key: random()


    if


    no DISTINCT


  121. grouping + distinct key

    if


    DISTINCT。Sorting Key: grouping key


    if


    no


  122. DISTINCT grouping + distinct key

    if


    DISTINCT





  123. 2


    )GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)





  124. 3


    )ReduceSinkOperator:Partitioning Key: grouping key。Sorting


  125. Key: grouping key

    if


    no DISTINCT grouping + distinct key


    if


    DISTINCT





  126. 4


    )GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)



  127. }




  128. 第四种情况:


  129. set hive.map.aggr=

    true


    ;



  130. set hive.groupby.skewindata=

    true


    ;



  131. SemanticAnalyzer.genGroupByPlanMapAggr2MR(){




  132. 1


    )GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)





  133. 2


    )ReduceSinkOperator: Partitioning Key: random()


    if


    no


  134. DISTINCT grouping + distinct key

    if


    DISTINCT。 Sorting Key: grouping key



  135. if


    no DISTINCT grouping + distinct key


    if


    DISTINCT。





  136. 3


    )GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer:


  137. iterate/terminatePartial

    if


    DISTINCT merge/terminatePartial


    if


    NO


  138. DISTINCT (mode = MERGEPARTIAL)




  139. 4


    )ReduceSinkOperator:Partitioining Key: grouping key。Sorting


  140. Key: grouping key

    if


    no DISTINCT grouping + distinct key


    if


    DISTINCT





  141. 5


    )GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)



  142. }






  143. ReduceSinkOperator的processOp(Object row,

    int




  144. tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key:

  145. random()

    if


    no DISTINCT grouping + distinct key


    if




  146. DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机

  147. 数,random =

    new


    Random(


    12345


    );。如果有DISTINCT字段,那么key的hash值跟grouping +


  148. distinct key有关。








  149. GroupByOperator:


  150. initializeOp(Configuration hconf)


  151. processOp(Object row,

    int


    tag)



  152. closeOp(

    boolean


    abort)



  153. forward(ArrayList<Object> keys, AggregationBuffer[] aggs)






  154. groupby10.q   groupby11.q


  155. set hive.map.aggr=

    false


    ;



  156. set hive.groupby.skewindata=

    false


    ;





  157. EXPLAIN


  158. FROM INPUT


  159. INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,

  160. count(substr(INPUT.value,

    5


    )), count(distinct substr(INPUT.value,


    5


    ))


  161. GROUP BY INPUT.key;




  162. STAGE DEPENDENCIES:


  163. Stage-

    1


    is a root stage



  164. Stage-

    0


    depends on stages: Stage-


    1







  165. STAGE PLANS:


  166. Stage: Stage-

    1





  167. Map Reduce


  168. Alias -> Map Operator Tree:


  169. input


  170. TableScan


  171. alias: input


  172. Select Operator

    // insertSelectAllPlanForGroupBy





  173. expressions:


  174. expr: key


  175. type:

    int





  176. expr: value


  177. type: string


  178. outputColumnNames: key, value


  179. Reduce Output Operator


  180. key expressions:


  181. expr: key


  182. type:

    int





  183. expr: substr(value,

    5


    )



  184. type: string


  185. sort order: ++


  186. Map-reduce partition columns:


  187. expr: key


  188. type:

    int





  189. tag: –

    1





  190. Reduce Operator Tree:


  191. Group By Operator


  192. aggregations:


  193. expr: count(KEY._col1:

    0


    ._col0)



  194. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  195. bucketGroup:

    false





  196. keys:


  197. expr: KEY._col0


  198. type:

    int





  199. mode: complete


  200. outputColumnNames: _col0, _col1, _col2


  201. Select Operator


  202. expressions:


  203. expr: _col0


  204. type:

    int





  205. expr: _col1


  206. type: bigint


  207. expr: _col2


  208. type: bigint


  209. outputColumnNames: _col0, _col1, _col2


  210. Select Operator


  211. expressions:


  212. expr: _col0


  213. type:

    int





  214. expr: UDFToInteger(_col1)


  215. type:

    int





  216. expr: UDFToInteger(_col2)


  217. type:

    int





  218. outputColumnNames: _col0, _col1, _col2


  219. File Output Operator


  220. compressed:

    false





  221. GlobalTableId:

    1





  222. table:


  223. input format: org.apache.hadoop.mapred.TextInputFormat


  224. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  225. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  226. name: dest1




  227. Stage: Stage-

    0





  228. Move Operator


  229. tables:


  230. replace:

    true





  231. table:


  232. input format: org.apache.hadoop.mapred.TextInputFormat


  233. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  234. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  235. name: dest1










  236. set hive.map.aggr=

    true


    ;



  237. set hive.groupby.skewindata=

    false


    ;





  238. STAGE DEPENDENCIES:


  239. Stage-

    1


    is a root stage



  240. Stage-

    0


    depends on stages: Stage-


    1







  241. STAGE PLANS:


  242. Stage: Stage-

    1





  243. Map Reduce


  244. Alias -> Map Operator Tree:


  245. input


  246. TableScan


  247. alias: input


  248. Select Operator


  249. expressions:


  250. expr: key


  251. type:

    int





  252. expr: value


  253. type: string


  254. outputColumnNames: key, value


  255. Group By Operator


  256. aggregations:


  257. expr: count(substr(value,

    5


    ))



  258. expr: count(DISTINCT substr(value,

    5


    ))



  259. bucketGroup:

    false





  260. keys:


  261. expr: key


  262. type:

    int





  263. expr: substr(value,

    5


    )



  264. type: string


  265. mode: hash


  266. outputColumnNames: _col0, _col1, _col2, _col3


  267. Reduce Output Operator


  268. key expressions:


  269. expr: _col0


  270. type:

    int





  271. expr: _col1


  272. type: string


  273. sort order: ++


  274. Map-reduce partition columns:


  275. expr: _col0


  276. type:

    int





  277. tag: –

    1





  278. value expressions:


  279. expr: _col2


  280. type: bigint


  281. expr: _col3


  282. type: bigint


  283. Reduce Operator Tree:


  284. Group By Operator


  285. aggregations:


  286. expr: count(VALUE._col0)


  287. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  288. bucketGroup:

    false





  289. keys:


  290. expr: KEY._col0


  291. type:

    int





  292. mode: mergepartial


  293. outputColumnNames: _col0, _col1, _col2


  294. Select Operator


  295. expressions:


  296. expr: _col0


  297. type:

    int





  298. expr: _col1


  299. type: bigint


  300. expr: _col2


  301. type: bigint


  302. outputColumnNames: _col0, _col1, _col2


  303. Select Operator


  304. expressions:


  305. expr: _col0


  306. type:

    int





  307. expr: UDFToInteger(_col1)


  308. type:

    int





  309. expr: UDFToInteger(_col2)


  310. type:

    int





  311. outputColumnNames: _col0, _col1, _col2


  312. File Output Operator


  313. compressed:

    false





  314. GlobalTableId:

    1





  315. table:


  316. input format: org.apache.hadoop.mapred.TextInputFormat


  317. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  318. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  319. name: dest1




  320. Stage: Stage-

    0





  321. Move Operator


  322. tables:


  323. replace:

    true





  324. table:


  325. input format: org.apache.hadoop.mapred.TextInputFormat


  326. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  327. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  328. name: dest1


















  329. set hive.map.aggr=

    false


    ;



  330. set hive.groupby.skewindata=

    true


    ;





  331. STAGE DEPENDENCIES:


  332. Stage-

    1


    is a root stage



  333. Stage-

    2


    depends on stages: Stage-


    1





  334. Stage-

    0


    depends on stages: Stage-


    2







  335. STAGE PLANS:


  336. Stage: Stage-

    1





  337. Map Reduce


  338. Alias -> Map Operator Tree:


  339. input


  340. TableScan


  341. alias: input


  342. Select Operator


  343. expressions:


  344. expr: key


  345. type:

    int





  346. expr: value


  347. type: string


  348. outputColumnNames: key, value


  349. Reduce Output Operator


  350. key expressions:


  351. expr: key


  352. type:

    int





  353. expr: substr(value,

    5


    )



  354. type: string


  355. sort order: ++


  356. Map-reduce partition columns:


  357. expr: key


  358. type:

    int





  359. tag: –

    1





  360. Reduce Operator Tree:


  361. Group By Operator


  362. aggregations:


  363. expr: count(KEY._col1:

    0


    ._col0)



  364. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  365. bucketGroup:

    false





  366. keys:


  367. expr: KEY._col0


  368. type:

    int





  369. mode: partial1


  370. outputColumnNames: _col0, _col1, _col2


  371. File Output Operator


  372. compressed:

    false





  373. GlobalTableId:

    0





  374. table:


  375. input format: org.apache.hadoop.mapred.SequenceFileInputFormat


  376. output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat




  377. Stage: Stage-

    2





  378. Map Reduce


  379. Alias -> Map Operator Tree:


  380. hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-48-26_387_7978992474997402829/-mr-10002





  381. Reduce Output Operator


  382. key expressions:


  383. expr: _col0


  384. type:

    int





  385. sort order: +


  386. Map-reduce partition columns:


  387. expr: _col0


  388. type:

    int





  389. tag: –

    1





  390. value expressions:


  391. expr: _col1


  392. type: bigint


  393. expr: _col2


  394. type: bigint


  395. Reduce Operator Tree:


  396. Group By Operator


  397. aggregations:


  398. expr: count(VALUE._col0)


  399. expr: count(VALUE._col1)


  400. bucketGroup:

    false





  401. keys:


  402. expr: KEY._col0


  403. type:

    int





  404. mode:

    final





  405. outputColumnNames: _col0, _col1, _col2


  406. Select Operator


  407. expressions:


  408. expr: _col0


  409. type:

    int





  410. expr: _col1


  411. type: bigint


  412. expr: _col2


  413. type: bigint


  414. outputColumnNames: _col0, _col1, _col2


  415. Select Operator


  416. expressions:


  417. expr: _col0


  418. type:

    int





  419. expr: UDFToInteger(_col1)


  420. type:

    int





  421. expr: UDFToInteger(_col2)


  422. type:

    int





  423. outputColumnNames: _col0, _col1, _col2


  424. File Output Operator


  425. compressed:

    false





  426. GlobalTableId:

    1





  427. table:


  428. input format: org.apache.hadoop.mapred.TextInputFormat


  429. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  430. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  431. name: dest1




  432. Stage: Stage-

    0





  433. Move Operator


  434. tables:


  435. replace:

    true





  436. table:


  437. input format: org.apache.hadoop.mapred.TextInputFormat


  438. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  439. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  440. name: dest1










  441. set hive.map.aggr=

    true


    ;



  442. set hive.groupby.skewindata=

    true


    ;





  443. STAGE DEPENDENCIES:


  444. Stage-

    1


    is a root stage



  445. Stage-

    2


    depends on stages: Stage-


    1





  446. Stage-

    0


    depends on stages: Stage-


    2







  447. STAGE PLANS:


  448. Stage: Stage-

    1





  449. Map Reduce


  450. Alias -> Map Operator Tree:


  451. input


  452. TableScan


  453. alias: input


  454. Select Operator


  455. expressions:


  456. expr: key


  457. type:

    int





  458. expr: value


  459. type: string


  460. outputColumnNames: key, value


  461. Group By Operator


  462. aggregations:


  463. expr: count(substr(value,

    5


    ))



  464. expr: count(DISTINCT substr(value,

    5


    ))



  465. bucketGroup:

    false





  466. keys:


  467. expr: key


  468. type:

    int





  469. expr: substr(value,

    5


    )



  470. type: string


  471. mode: hash


  472. outputColumnNames: _col0, _col1, _col2, _col3


  473. Reduce Output Operator


  474. key expressions:


  475. expr: _col0


  476. type:

    int





  477. expr: _col1


  478. type: string


  479. sort order: ++


  480. Map-reduce partition columns:


  481. expr: _col0


  482. type:

    int





  483. tag: –

    1





  484. value expressions:


  485. expr: _col2


  486. type: bigint


  487. expr: _col3


  488. type: bigint


  489. Reduce Operator Tree:


  490. Group By Operator


  491. aggregations:


  492. expr: count(VALUE._col0)


  493. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  494. bucketGroup:

    false





  495. keys:


  496. expr: KEY._col0


  497. type:

    int





  498. mode: partials


  499. outputColumnNames: _col0, _col1, _col2


  500. File Output Operator


  501. compressed:

    false





  502. GlobalTableId:

    0





  503. table:


  504. input format: org.apache.hadoop.mapred.SequenceFileInputFormat


  505. output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat




  506. Stage: Stage-

    2





  507. Map Reduce


  508. Alias -> Map Operator Tree:


  509. hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-49-25_899_4946067838822964610/-mr-10002





  510. Reduce Output Operator


  511. key expressions:


  512. expr: _col0


  513. type:

    int





  514. sort order: +


  515. Map-reduce partition columns:


  516. expr: _col0


  517. type:

    int





  518. tag: –

    1





  519. value expressions:


  520. expr: _col1


  521. type: bigint


  522. expr: _col2


  523. type: bigint


  524. Reduce Operator Tree:


  525. Group By Operator


  526. aggregations:


  527. expr: count(VALUE._col0)


  528. expr: count(VALUE._col1)


  529. bucketGroup:

    false





  530. keys:


  531. expr: KEY._col0


  532. type:

    int





  533. mode:

    final





  534. outputColumnNames: _col0, _col1, _col2


  535. Select Operator


  536. expressions:


  537. expr: _col0


  538. type:

    int





  539. expr: _col1


  540. type: bigint


  541. expr: _col2


  542. type: bigint


  543. outputColumnNames: _col0, _col1, _col2


  544. Select Operator


  545. expressions:


  546. expr: _col0


  547. type:

    int





  548. expr: UDFToInteger(_col1)


  549. type:

    int





  550. expr: UDFToInteger(_col2)


  551. type:

    int





  552. outputColumnNames: _col0, _col1, _col2


  553. File Output Operator


  554. compressed:

    false





  555. GlobalTableId:

    1





  556. table:


  557. input format: org.apache.hadoop.mapred.TextInputFormat


  558. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  559. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  560. name: dest1




  561. Stage: Stage-

    0





  562. Move Operator


  563. tables:


  564. replace:

    true





  565. table:


  566. input format: org.apache.hadoop.mapred.TextInputFormat


  567. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  568. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  569. name: dest1


















  570. set hive.map.aggr=

    false


    ;



  571. set hive.groupby.skewindata=

    false


    ;





  572. EXPLAIN extended


  573. FROM INPUT


  574. INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,

  575. count(substr(INPUT.value,

    5


    )), count(distinct substr(INPUT.value,


    5


    ))


  576. GROUP BY INPUT.key;




  577. STAGE DEPENDENCIES:


  578. Stage-

    1


    is a root stage



  579. Stage-

    0


    depends on stages: Stage-


    1







  580. STAGE PLANS:


  581. Stage: Stage-

    1





  582. Map Reduce


  583. Alias -> Map Operator Tree:


  584. input


  585. TableScan


  586. alias: input


  587. Select Operator


  588. expressions:


  589. expr: key


  590. type:

    int





  591. expr: value


  592. type: string


  593. outputColumnNames: key, value


  594. Reduce Output Operator


  595. key expressions:


  596. expr: key


  597. type:

    int





  598. expr: substr(value,

    5


    )



  599. type: string


  600. sort order: ++


  601. Map-reduce partition columns:


  602. expr: key


  603. type:

    int





  604. tag: –

    1





  605. Needs Tagging:

    false





  606. Path -> Alias:


  607. hdfs:

    //localhost:54310/user/hive/warehouse/input [input]





  608. Path -> Partition:


  609. hdfs:

    //localhost:54310/user/hive/warehouse/input





  610. Partition


  611. base file name: input


  612. input format: org.apache.hadoop.mapred.TextInputFormat


  613. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  614. properties:


  615. bucket_count –

    1





  616. columns key,value


  617. columns.types

    int


    :string



  618. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  619. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  620. location hdfs:

    //localhost:54310/user/hive/warehouse/input





  621. name input


  622. serialization.ddl struct input { i32 key, string value}


  623. serialization.format

    1





  624. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  625. transient_lastDdlTime

    1310523947





  626. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe




  627. input format: org.apache.hadoop.mapred.TextInputFormat


  628. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  629. properties:


  630. bucket_count –

    1





  631. columns key,value


  632. columns.types

    int


    :string



  633. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  634. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  635. location hdfs:

    //localhost:54310/user/hive/warehouse/input





  636. name input


  637. serialization.ddl struct input { i32 key, string value}


  638. serialization.format

    1





  639. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  640. transient_lastDdlTime

    1310523947





  641. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  642. name: input


  643. name: input


  644. Reduce Operator Tree:


  645. Group By Operator


  646. aggregations:


  647. expr: count(KEY._col1:

    0


    ._col0)



  648. expr: count(DISTINCT KEY._col1:

    0


    ._col0)



  649. bucketGroup:

    false





  650. keys:


  651. expr: KEY._col0


  652. type:

    int





  653. mode: complete


  654. outputColumnNames: _col0, _col1, _col2


  655. Select Operator


  656. expressions:


  657. expr: _col0


  658. type:

    int





  659. expr: _col1


  660. type: bigint


  661. expr: _col2


  662. type: bigint


  663. outputColumnNames: _col0, _col1, _col2


  664. Select Operator


  665. expressions:


  666. expr: _col0


  667. type:

    int





  668. expr: UDFToInteger(_col1)


  669. type:

    int





  670. expr: UDFToInteger(_col2)


  671. type:

    int





  672. outputColumnNames: _col0, _col1, _col2


  673. File Output Operator


  674. compressed:

    false





  675. GlobalTableId:

    1





  676. directory: hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000





  677. NumFilesPerFileSink:

    1





  678. table:


  679. input format: org.apache.hadoop.mapred.TextInputFormat


  680. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  681. properties:


  682. bucket_count –

    1





  683. columns key,val1,val2


  684. columns.types

    int


    :


    int


    :


    int





  685. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  686. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  687. location hdfs:

    //localhost:54310/user/hive/warehouse/dest1





  688. name dest1


  689. serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}


  690. serialization.format

    1





  691. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  692. transient_lastDdlTime

    1310523946





  693. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  694. name: dest1


  695. TotalFiles:

    1





  696. MultiFileSpray:

    false







  697. Stage: Stage-

    0





  698. Move Operator


  699. tables:


  700. replace:

    true





  701. source: hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000





  702. table:


  703. input format: org.apache.hadoop.mapred.TextInputFormat


  704. output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  705. properties:


  706. bucket_count –

    1





  707. columns key,val1,val2


  708. columns.types

    int


    :


    int


    :


    int





  709. file.inputformat org.apache.hadoop.mapred.TextInputFormat


  710. file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat


  711. location hdfs:

    //localhost:54310/user/hive/warehouse/dest1





  712. name dest1


  713. serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}


  714. serialization.format

    1





  715. serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  716. transient_lastDdlTime

    1310523946





  717. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


  718. name: dest1


  719. tmp directory: hdfs:

    //localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10001











  720. ABSTRACT SYNTAX TREE:


  721. (TOK_QUERY


  722. (TOK_FROM (TOK_TABREF INPUT))


  723. (TOK_INSERT


  724. (TOK_DESTINATION (TOK_TAB dest1))


  725. (TOK_SELECT


  726. (TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))


  727. (TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value)

    5


    )))



  728. (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value)

    5


    )))



  729. )


  730. (TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))


  731. )


  732. )








  733. SemanticAnalyzer.genBodyPlan(QB qb, Operator input){



  734. if


    (qbp.getAggregationExprsForClause(dest).size() !=


    0





  735. || getGroupByForClause(qbp, dest).size() >

    0


    ) {


    //如果有聚合函数或者有groupby,则执行下面的操作






  736. //multiple distincts is not supported with skew in data






  737. if


    (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)



  738. .equalsIgnoreCase(

    “true”


    ) &&



  739. qbp.getDistinctFuncExprsForClause(dest).size() >

    1


    ) {




  740. throw




    new


    SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.



  741. getMsg());


  742. }



  743. // insert a select operator here used by the ColumnPruner to reduce






  744. // the data to shuffle





  745. curr = insertSelectAllPlanForGroupBy(dest, curr);

    //生成一个SelectOperator,所有的字段都会选取,selectStar=true。






  746. if


    (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)



  747. .equalsIgnoreCase(

    “true”


    )) {




  748. if


    (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)



  749. .equalsIgnoreCase(

    “false”


    )) {



  750. curr = genGroupByPlanMapAggr1MR(dest, qb, curr);


  751. }

    else


    {



  752. curr = genGroupByPlanMapAggr2MR(dest, qb, curr);


  753. }


  754. }

    else




    if


    (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)



  755. .equalsIgnoreCase(

    “true”


    )) {



  756. curr = genGroupByPlan2MR(dest, qb, curr);


  757. }

    else


    {



  758. curr = genGroupByPlan1MR(dest, qb, curr);


  759. }


  760. }


  761. }




  762. distince:


  763. count.q.out


  764. groupby11.q.out


  765. groupby10.q.out


  766. nullgroup4_multi_distinct.q.out


  767. join18.q.out


  768. groupby_bigdata.q.out


  769. join18_multi_distinct.q.out


  770. nullgroup4.q.out


  771. auto_join18_multi_distinct.q.out


  772. auto_join18.q.out






  773. 1


    )map端部分聚合,数据无倾斜,一个MR生成。



  774. genGroupByPlanMapAggr1MR,生成三个Operator:




  775. 1.1


    )GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:



  776. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  777. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames


  778. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  779. public


    GroupByDesc(




  780. final


    Mode mode,




  781. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  782. final


    java.util.ArrayList<ExprNodeDesc> keys,




  783. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  784. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  785. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  786. false


    , groupByMemoryUsage, memoryThreshold);



  787. }


  788. mode:GroupByDesc.Mode.HASH


  789. outputColumnNames:groupby+Distinct+Aggregation


  790. keys:groupby+Distinct


  791. aggregators:Aggregation


  792. groupKeyNotReductionKey:

    false





  793. groupByMemoryUsage:默认为

    0.5





  794. memoryThreshold:默认为

    0.9









  795. 1.2


    )ReduceSinkOperator



  796. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  797. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  798. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  799. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  800. int


    numDistributionKeys,



  801. java.util.ArrayList<ExprNodeDesc> valueCols,


  802. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  803. List<List<Integer>> distinctColumnIndices,


  804. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  805. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  806. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  807. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  808. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  809. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  810. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  811. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  812. this


    .tag = tag;


    // -1






  813. this


    .numReducers = numReducers;


    // 一般都是-1






  814. this


    .partitionCols = partitionCols;


    // groupby






  815. this


    .keySerializeInfo = keySerializeInfo;




  816. this


    .valueSerializeInfo = valueSerializeInfo;




  817. this


    .distinctColumnIndices = distinctColumnIndices;



  818. }






  819. 1.3


    )GroupByOperator



  820. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  821. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  822. public


    GroupByDesc(




  823. final


    Mode mode,




  824. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  825. final


    java.util.ArrayList<ExprNodeDesc> keys,




  826. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  827. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  828. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  829. false


    , groupByMemoryUsage, memoryThreshold);



  830. }


  831. mode:GroupByDesc.Mode.MERGEPARTIAL


  832. outputColumnNames:groupby+Aggregation


  833. keys:groupby


  834. aggregators:Aggregation


  835. groupKeyNotReductionKey:

    false





  836. groupByMemoryUsage:默认为

    0.5





  837. memoryThreshold:默认为

    0.9









  838. 2


    )map端部分聚合,数据倾斜,两个MR生成。



  839. genGroupByPlanMapAggr2MR:




  840. 2.1


    )GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:



  841. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  842. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames


  843. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  844. public


    GroupByDesc(




  845. final


    Mode mode,




  846. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  847. final


    java.util.ArrayList<ExprNodeDesc> keys,




  848. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  849. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  850. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  851. false


    , groupByMemoryUsage, memoryThreshold);



  852. }


  853. mode:GroupByDesc.Mode.HASH


  854. outputColumnNames:groupby+Distinct+Aggregation


  855. keys:groupby+Distinct


  856. aggregators:Aggregation


  857. groupKeyNotReductionKey:

    false





  858. groupByMemoryUsage:默认为

    0.5





  859. memoryThreshold:默认为

    0.9









  860. 2.2


    )ReduceSinkOperator



  861. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  862. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  863. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  864. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  865. int


    numDistributionKeys,



  866. java.util.ArrayList<ExprNodeDesc> valueCols,


  867. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  868. List<List<Integer>> distinctColumnIndices,


  869. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  870. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  871. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  872. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  873. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  874. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  875. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  876. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  877. this


    .tag = tag;


    // -1






  878. this


    .numReducers = numReducers;


    // 一般都是-1






  879. this


    .partitionCols = partitionCols;


    // groupby






  880. this


    .keySerializeInfo = keySerializeInfo;




  881. this


    .valueSerializeInfo = valueSerializeInfo;




  882. this


    .distinctColumnIndices = distinctColumnIndices;



  883. }






  884. 2.3


    )GroupByOperator



  885. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  886. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  887. public


    GroupByDesc(




  888. final


    Mode mode,




  889. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  890. final


    java.util.ArrayList<ExprNodeDesc> keys,




  891. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  892. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  893. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  894. false


    , groupByMemoryUsage, memoryThreshold);



  895. }


  896. mode:GroupByDesc.Mode.PARTIALS


  897. outputColumnNames:groupby+Aggregation


  898. keys:groupby


  899. aggregators:Aggregation


  900. groupKeyNotReductionKey:

    false





  901. groupByMemoryUsage:默认为

    0.5





  902. memoryThreshold:默认为

    0.9









  903. 2.4


    )ReduceSinkOperator



  904. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames


  905. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames



  906. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  907. int


    numDistributionKeys,



  908. java.util.ArrayList<ExprNodeDesc> valueCols,


  909. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  910. List<List<Integer>> distinctColumnIndices,


  911. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  912. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  913. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  914. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby






  915. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  916. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  917. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  918. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  919. this


    .tag = tag;


    // -1






  920. this


    .numReducers = numReducers;


    // 一般都是-1






  921. this


    .partitionCols = partitionCols;


    // groupby






  922. this


    .keySerializeInfo = keySerializeInfo;




  923. this


    .valueSerializeInfo = valueSerializeInfo;




  924. this


    .distinctColumnIndices = distinctColumnIndices;



  925. }






  926. 2.5


    )GroupByOperator



  927. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  928. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames



  929. public


    GroupByDesc(




  930. final


    Mode mode,




  931. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  932. final


    java.util.ArrayList<ExprNodeDesc> keys,




  933. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  934. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  935. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  936. false


    , groupByMemoryUsage, memoryThreshold);



  937. }


  938. mode:GroupByDesc.Mode.FINAL


  939. outputColumnNames:groupby+Aggregation


  940. keys:groupby


  941. aggregators:Aggregation


  942. groupKeyNotReductionKey:

    false





  943. groupByMemoryUsage:默认为

    0.5





  944. memoryThreshold:默认为

    0.9











  945. 3


    )map端不部分聚合,数据倾斜,两个MR生成。



  946. genGroupByPlan2MR:






  947. 3.1


    )ReduceSinkOperator



  948. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  949. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  950. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  951. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  952. int


    numDistributionKeys,



  953. java.util.ArrayList<ExprNodeDesc> valueCols,


  954. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  955. List<List<Integer>> distinctColumnIndices,


  956. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  957. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  958. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  959. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  960. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  961. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  962. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  963. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  964. this


    .tag = tag;


    // -1






  965. this


    .numReducers = numReducers;


    // 一般都是-1






  966. this


    .partitionCols = partitionCols;


    // groupby






  967. this


    .keySerializeInfo = keySerializeInfo;




  968. this


    .valueSerializeInfo = valueSerializeInfo;




  969. this


    .distinctColumnIndices = distinctColumnIndices;



  970. }






  971. 3.2


    )GroupByOperator



  972. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  973. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames



  974. public


    GroupByDesc(




  975. final


    Mode mode,




  976. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  977. final


    java.util.ArrayList<ExprNodeDesc> keys,




  978. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  979. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  980. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  981. false


    , groupByMemoryUsage, memoryThreshold);



  982. }


  983. mode:GroupByDesc.Mode.PARTIAL1


  984. outputColumnNames:groupby+Aggregation


  985. keys:groupby


  986. aggregators:Aggregation


  987. groupKeyNotReductionKey:

    false





  988. groupByMemoryUsage:默认为

    0.5





  989. memoryThreshold:默认为

    0.9









  990. 3.3


    )ReduceSinkOperator



  991. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames


  992. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames



  993. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  994. int


    numDistributionKeys,



  995. java.util.ArrayList<ExprNodeDesc> valueCols,


  996. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  997. List<List<Integer>> distinctColumnIndices,


  998. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  999. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  1000. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  1001. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby






  1002. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  1003. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  1004. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  1005. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  1006. this


    .tag = tag;


    // -1






  1007. this


    .numReducers = numReducers;


    // 一般都是-1






  1008. this


    .partitionCols = partitionCols;


    // groupby






  1009. this


    .keySerializeInfo = keySerializeInfo;




  1010. this


    .valueSerializeInfo = valueSerializeInfo;




  1011. this


    .distinctColumnIndices = distinctColumnIndices;



  1012. }






  1013. 3.4


    )GroupByOperator



  1014. 处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames


  1015. 处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames



  1016. public


    GroupByDesc(




  1017. final


    Mode mode,




  1018. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  1019. final


    java.util.ArrayList<ExprNodeDesc> keys,




  1020. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  1021. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  1022. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  1023. false


    , groupByMemoryUsage, memoryThreshold);



  1024. }


  1025. mode:GroupByDesc.Mode.FINAL


  1026. outputColumnNames:groupby+Aggregation


  1027. keys:groupby


  1028. aggregators:Aggregation


  1029. groupKeyNotReductionKey:

    false





  1030. groupByMemoryUsage:默认为

    0.5





  1031. memoryThreshold:默认为

    0.9











  1032. 4


    )map端不部分聚合,数据无倾斜,一个MR生成。



  1033. genGroupByPlan1MR:




  1034. 4.1


    )ReduceSinkOperator



  1035. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  1036. 处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames


  1037. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  1038. public


    ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,




  1039. int


    numDistributionKeys,



  1040. java.util.ArrayList<ExprNodeDesc> valueCols,


  1041. java.util.ArrayList<java.lang.String> outputKeyColumnNames,


  1042. List<List<Integer>> distinctColumnIndices,


  1043. java.util.ArrayList<java.lang.String> outputValueColumnNames,

    int


    tag,



  1044. java.util.ArrayList<ExprNodeDesc> partitionCols,

    int


    numReducers,




  1045. final


    TableDesc keySerializeInfo,


    final


    TableDesc valueSerializeInfo) {




  1046. this


    .keyCols = keyCols;


    // 为reduceKeys,groupby+distinct






  1047. this


    .numDistributionKeys = numDistributionKeys;


    // grpByExprs.size()






  1048. this


    .valueCols = valueCols;


    //reduceValues,聚合函数






  1049. this


    .outputKeyColumnNames = outputKeyColumnNames;


    //outputKeyColumnNames






  1050. this


    .outputValueColumnNames = outputValueColumnNames;


    //outputValueColumnNames






  1051. this


    .tag = tag;


    // -1






  1052. this


    .numReducers = numReducers;


    // 一般都是-1






  1053. this


    .partitionCols = partitionCols;


    // groupby






  1054. this


    .keySerializeInfo = keySerializeInfo;




  1055. this


    .valueSerializeInfo = valueSerializeInfo;




  1056. this


    .distinctColumnIndices = distinctColumnIndices;



  1057. }






  1058. 4.2


    )GroupByOperator



  1059. 处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames


  1060. 处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames



  1061. public


    GroupByDesc(




  1062. final


    Mode mode,




  1063. final


    java.util.ArrayList<java.lang.String> outputColumnNames,




  1064. final


    java.util.ArrayList<ExprNodeDesc> keys,




  1065. final


    java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,




  1066. final




    boolean


    groupKeyNotReductionKey,


    float


    groupByMemoryUsage,


    float


    memoryThreshold) {




  1067. this


    (mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,




  1068. false


    , groupByMemoryUsage, memoryThreshold);



  1069. }


  1070. mode:GroupByDesc.Mode.COMPLETE


  1071. outputColumnNames:groupby+Aggregation


  1072. keys:groupby


  1073. aggregators:Aggregation


  1074. groupKeyNotReductionKey:

    false





  1075. groupByMemoryUsage:默认为

    0.5





  1076. memoryThreshold:默认为

    0.9











  1077. SemanticAnalyzer.genBodyPlan


  1078. optimizeMultiGroupBy  (multi-group by with the same distinct)


  1079. groupby10.q  groupby11.q