Spark 源码解读02—(集合生成的)rdd的分区数据划分

  • Post author:
  • Post category:其他




2.2、从集合中创建rdd的分区数据如何划分

  • 代码案例

    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(List(1,2,3,4,5),3)
    rdd.glom().collect()
    // 分区 [1] [2,3] [4,5]
    
  • 源码解析

    def makeRDD[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        parallelize(seq, numSlices)
      }
    /*
    withScope {
        parallelize(seq, numSlices) }
    parallelize  并行度点进去
    */
      def parallelize[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        assertNotStopped()
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
      }
    /*
    可以看到该方法new 了一个rdd对象  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
    点进去
    */
      override def getPartitions: Array[Partition] = {
        val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
        slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
      }
    /*
    该类中有 getPartitions 这个方法;
    ParallelCollectionRDD.slice(data, numSlices).toArray 数据切分
    点进去
    */
     /**
       * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
       * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
       * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
       * is an inclusive Range, we use inclusive range for the last slice.
       将集合切片为 numSlices 子集合。我们在这里做的另一件事是特别对待 Range * 集合,将切片编码为其他 Ranges 以最小化内存成本。这使得 * 在表示大量数字的 RDD 上运行 Spark 变得高效。如果集合 * 是一个包含范围,我们对最后一个切片使用包含范围。
       */
    def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
        if (numSlices < 1) {
          throw new IllegalArgumentException("Positive number of partitions required")
        }
        // Sequences need to be sliced at the same set of index positions for operations
        // like RDD.zip() to behave as expected
        def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
          (0 until numSlices).iterator.map { i =>
            val start = ((i * length) / numSlices).toInt
            val end = (((i + 1) * length) / numSlices).toInt
            (start, end)
          }
        }
        seq match {
          case r: Range =>
            positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
              // If the range is inclusive, use inclusive range for the last slice
              if (r.isInclusive && index == numSlices - 1) {
                new Range.Inclusive(r.start + start * r.step, r.end, r.step)
              }
              else {
                new Range(r.start + start * r.step, r.start + end * r.step, r.step)
              }
            }.toSeq.asInstanceOf[Seq[Seq[T]]]
          case nr: NumericRange[_] =>
            // For ranges of Long, Double, BigInteger, etc
            val slices = new ArrayBuffer[Seq[T]](numSlices)
            var r = nr
            for ((start, end) <- positions(nr.length, numSlices)) {
              val sliceSize = end - start
              slices += r.take(sliceSize).asInstanceOf[Seq[T]]
              r = r.drop(sliceSize)
            }
            slices
          case _ =>
            val array = seq.toArray // To prevent O(n^2) operations for List etc
            positions(array.length, numSlices).map { case (start, end) =>
                array.slice(start, end).toSeq
            }.toSeq
        }
      }
    /*
    该方法中有 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {}
    和一个模式匹配:seq match {}
    我们这里是使用的列表
    case _ =>
            val array = seq.toArray 
            // 获取列表长度 ,分区个数
            positions(array.length, numSlices).map { case (start, end) =>
                // 开始数据拆分 
                array.slice(start, end).toSeq
            }.toSeq
            
    position(获取列表长度,分区个数)
    // 0 到 分区个数 不包含遍历; 可以理解为遍历分区,角标从0开始
    (0 until numSlices).iterator.map { i =>
            // 分区的编号 * 数据列表额长度/总分区数
            val start = ((i * length) / numSlices).toInt
            // (分区的编号+1) * 数据列表额长度/总分区数
            val end = (((i + 1) * length) / numSlices).toInt
            // 返回数据角标开始位置和结束位置。
            (start, end)
          }
    */
    /*
    // 开始数据拆分 
     array.slice(start, end).toSeq
    进入该方法
    如下面所示 slice(from: Int, until: Int)
    from ---> until 含头不含未式切分数据
    */
    def slice(from: Int, until: Int): Repr = {
        val lo    = math.max(from, 0)
        val hi    = math.min(math.max(until, 0), length)
        val elems = math.max(hi - lo, 0)
        val b     = newBuilder
        b.sizeHint(elems)
    
        var i = lo
        while (i < hi) {
          b += self(i)
          i += 1
        }
        b.result()
      }
    
  • 总结

    集合中创建的RDD分区数据划分根据 数组长度和分区个数决定

    遍历分区,编号从0开始

    计算每个分区数据的开始下标和结束下标

    i 为分区编号,numSlices为分区个数,length为数组长度

    val start = ((i * length) / numSlices).toInt

    val end = (((i + 1) * length) / numSlices).toInt

    根据(start ,end ) 下标含头不含尾将数据取出。放在每个分区中。


2.1、从集合中创建rdd的分区个数


2.2、从集合中创建rdd的分区数据如何划分


2.3、从外部存储(文件)创建rdd的个数


2.4、从外部存储(文件)创建rdd的数据如何划分



版权声明:本文为qq_34446614原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。