shuffle的作用是什么?
可以理解为将集群中所有节点上的数据进行重新整合分类的过程
shuffle为什么耗时?
shuffle需要对数据进行重新聚合和划分,然后分配到集群的各个节点进行下一个stage操作。不同节点间传输大量数据,会有大量的网络传输消耗。
spark的shuffle两种实现
在spark1.2之前,默认的shuffle是HashShuffle。该shuffle有一个严重的弊端,会产生大量的中间磁盘文件,进而由大量的磁盘IO影响性能
因此在spark1.2之后,默认的shuffle就改为sortShuffle了。改进在于:每个task在进行shuffle操作的时候,虽然也会产生较多的临时磁盘文件,但是最后将所有的临时文件合并(merge)成一个磁盘文件,并且有一个与之对应的索引文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的数据即可。
hashShuffle过程
shuffle write:每一个map task将数据分区(对key执行hash算法取余reduce任务个数),从而将相同的key的数据写入同一个buffer中,最终一个buffer文件对应一个磁盘文件
shuffle read:通常是一个stage刚开始要做的事情。此时stage的每一个task需要将上一个stage的计算结果中拉取属于自己的磁盘文件。每个read task会有自己的buffer缓冲,每次拉取与buffer缓冲大小相同的数据,通过内存中的map进行聚合操作。聚合完一批数据,在拉取下一批数据,知道最后所有数据拉去完,得到最终结果。
hashshuffle普通机制的问题
:shuffle write阶段每个task都会产生对应reduce task数量的小文件
,此时会产生大量耗时抵消的IO操作
合并机制的hashshuffle(优化后的shuffle)
优化的地方是:现在每个Executor(Executor内部可能对应多个task)输出小文件的数量是reduce task的数量。比普通的hashshuffle小文件数量少了很多。
hashshuffle生成文件个数总结:
普通的hashshuffle:
第一个stage中有M个task 第二个stage有N个task 结果会生成M
N个文件
优化的hashShuffle:
有M个Executor,第二个stage有N个task,结果会生成N
M个文件
sortshuffle
sortshufflemanager的运行机制主要分为两种,一种是普通运行机制,另一种是bypass运行机制。
bypass运行机制触发条件:
(1)当shuffle read task 的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认是200个)
(2)不是聚合类算子(reduceByKey)
sortshuffle的普通机制过程:
(1)写入内存数据结构
数据会先写入一个内存数据结构中(默认是5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是聚合类操作,选用map数据结构,一边聚合一边写入内存,如果是join,那么就选用Array的数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘(会先写到内存缓冲区),然后清空内存数据结构。
(2)排序
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
(3)溢写
对于排序之后的数据 ,会分批写入磁盘文件,数据会以每批一万条写入磁盘文件。首先会将数据缓存在内存缓冲区中,当内存缓冲区满了之后再一次写入磁盘文件,这样可以减少磁盘IO次数,提升性能。
(4)merge归并排序
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就是产生多个临时文件。最后将所有的临时磁盘文件都进行合并,写入最终的磁盘文件中。再写一份索引文件,标识了下游各个task需要的数据在这个磁盘文件中的start offset和end offset
注意:一个map task会产生一个索引文件和磁盘大文件
sortshuffle的bypass机制:
和sortshuffle的普通机制不一样的地方是,(1)写机制(shuffle write)直接写入内存缓冲区,没有内存数据结构了,因为bypass的触发条件之一就是不能是聚合类算子。(2)不会进行排序,节省了这部分的性能开销
sortShuffle的普通机制和bypass机制生成文件个数都是:
第一个stage有N个task 最后生成N个数据文件和N个索引文件