spark的hashshuffle和sortShuffle详解及区别

  • Post author:
  • Post category:其他



shuffle的作用是什么?


可以理解为将集群中所有节点上的数据进行重新整合分类的过程


shuffle为什么耗时?


shuffle需要对数据进行重新聚合和划分,然后分配到集群的各个节点进行下一个stage操作。不同节点间传输大量数据,会有大量的网络传输消耗。


spark的shuffle两种实现


在spark1.2之前,默认的shuffle是HashShuffle。该shuffle有一个严重的弊端,会产生大量的中间磁盘文件,进而由大量的磁盘IO影响性能

因此在spark1.2之后,默认的shuffle就改为sortShuffle了。改进在于:每个task在进行shuffle操作的时候,虽然也会产生较多的临时磁盘文件,但是最后将所有的临时文件合并(merge)成一个磁盘文件,并且有一个与之对应的索引文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的数据即可。


hashShuffle过程


shuffle write:每一个map task将数据分区(对key执行hash算法取余reduce任务个数),从而将相同的key的数据写入同一个buffer中,最终一个buffer文件对应一个磁盘文件

shuffle read:通常是一个stage刚开始要做的事情。此时stage的每一个task需要将上一个stage的计算结果中拉取属于自己的磁盘文件。每个read task会有自己的buffer缓冲,每次拉取与buffer缓冲大小相同的数据,通过内存中的map进行聚合操作。聚合完一批数据,在拉取下一批数据,知道最后所有数据拉去完,得到最终结果。


hashshuffle普通机制的问题

:shuffle write阶段每个task都会产生对应reduce task数量的小文件

,此时会产生大量耗时抵消的IO操作


合并机制的hashshuffle(优化后的shuffle)


优化的地方是:现在每个Executor(Executor内部可能对应多个task)输出小文件的数量是reduce task的数量。比普通的hashshuffle小文件数量少了很多。


hashshuffle生成文件个数总结:


普通的hashshuffle:

第一个stage中有M个task 第二个stage有N个task 结果会生成M

N个文件

优化的hashShuffle:

有M个Executor,第二个stage有N个task,结果会生成N

M个文件


sortshuffle


sortshufflemanager的运行机制主要分为两种,一种是普通运行机制,另一种是bypass运行机制。


bypass运行机制触发条件:


(1)当shuffle read task 的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认是200个)

(2)不是聚合类算子(reduceByKey)


sortshuffle的普通机制过程:

在这里插入图片描述

(1)写入内存数据结构

数据会先写入一个内存数据结构中(默认是5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是聚合类操作,选用map数据结构,一边聚合一边写入内存,如果是join,那么就选用Array的数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘(会先写到内存缓冲区),然后清空内存数据结构。

(2)排序

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。

(3)溢写

对于排序之后的数据 ,会分批写入磁盘文件,数据会以每批一万条写入磁盘文件。首先会将数据缓存在内存缓冲区中,当内存缓冲区满了之后再一次写入磁盘文件,这样可以减少磁盘IO次数,提升性能。

(4)merge归并排序

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就是产生多个临时文件。最后将所有的临时磁盘文件都进行合并,写入最终的磁盘文件中。再写一份索引文件,标识了下游各个task需要的数据在这个磁盘文件中的start offset和end offset

注意:一个map task会产生一个索引文件和磁盘大文件


sortshuffle的bypass机制:


和sortshuffle的普通机制不一样的地方是,(1)写机制(shuffle write)直接写入内存缓冲区,没有内存数据结构了,因为bypass的触发条件之一就是不能是聚合类算子。(2)不会进行排序,节省了这部分的性能开销


sortShuffle的普通机制和bypass机制生成文件个数都是:


第一个stage有N个task 最后生成N个数据文件和N个索引文件



版权声明:本文为weixin_43806056原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。