Spark两种核心Shuffle(HashShuffle与sortShuffle)

  • Post author:
  • Post category:其他



SparkShuffle:

SparkShuffle概念

reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>对的形式,这样每一个key对应一个聚合起来的value。


问题:

聚合之前,每一个key对应的value不一定都是在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,RDD的partition极有可能分布在各个节点上。

HashShuffle:

HashShuffle是最开始存在Shuffle机制,在2.0版本以后就弃用了,下面我们了解一下。我们要知道有多少个reduce task就有多少个buffer

1)普通机制:

执行流程:

  1. 每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用。
  2. 每个buffer文件最后对应一个磁盘小文件。
  3. reduce task来拉取对应的磁盘小文件。

存在问题:

1. 小文件过多,耗时低效的IO操作

2.OOM,读写文件以及缓存过多

2)进行优化:

每个core中的所有task对应一个Buffler内存,这样就减少了落地小文件和IO开销,不过当reduce tast有好多的时候,显然进行优化的方式也不好,于是延伸出了sortBuffle

SortShuffle:

SortShuffle的运行机制主要分成两种: 普通运行机制       bypass运行机制

1)普通运行机制:

  • 执行流程
  1. map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M
  2. 在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
  3. 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
  4. 在溢写之前内存结构中的数据会进行排序分区
  5. 然后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据,
  6. map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。
  7. reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。

2)bypass机制

比如reducebykey算子用的就是普通机制shuffle,sortbykey算子、repartition算子用的就是bypass机制



版权声明:本文为Romantic_sir原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。