spark种reparation和coalesce的用法和区别

  • Post author:
  • Post category:其他


一 对比

1 功能对比:repartition底层本质是调用的coalesce方法,参数shuffle默认为true,表示一定会进行shuffle操作。coalesce(numPartitions, shuffle = true),具体参见下面的二 源码

2 应用场景对比:

若N分区数的rdd要重新划分为M个分区

场景1:  如果N>M且数量级相近,类似1000个分区划分为100个分区,可以使用coalesce(numPartitions, shuffle = false)

场景2 : 如果N<M 或者(N>>M )则建议选择repartition方法

二 源码

1 reparation源码

/**
 * Return a new RDD that has exactly numPartitions partitions.
 *
 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
 * a shuffle to redistribute data.
 *
 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
 * which can avoid performing a shuffle.
 */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

2

coalesce源码

/**
 * Return a new RDD that is reduced into `numPartitions` partitions.
 *
 * This results in a narrow dependency, e.g. if you go from 1000 partitions
 * to 100 partitions, there will not be a shuffle, instead each of the 100
 * new partitions will claim 10 of the current partitions.
 *
 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
 * this may result in your computation taking place on fewer nodes than
 * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
 * you can pass shuffle = true. This will add a shuffle step, but means the
 * current upstream partitions will be executed in parallel (per whatever
 * the current partitioning is).
 *
 * @note With shuffle = true, you can actually coalesce to a larger number
 * of partitions. This is useful if you have a small number of partitions,
 * say 100, potentially with a few partitions being abnormally large. Calling
 * coalesce(1000, shuffle = true) will result in 1000 partitions with the
 * data distributed using a hash partitioner. The optional partition coalescer
 * passed in must be serializable.
 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
             partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
            (implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
      var position = (new Random(index)).nextInt(numPartitions)
      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
        position = position + 1
        (position, t)
      }
    } : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
      new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
      new HashPartitioner(numPartitions)),
      numPartitions,
      partitionCoalescer).values
  } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
  }
}



版权声明:本文为dalang_1234原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。