2.2、从集合中创建rdd的分区数据如何划分
-
代码案例
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4,5),3) rdd.glom().collect() // 分区 [1] [2,3] [4,5]
-
源码解析
def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } /* withScope { parallelize(seq, numSlices) } parallelize 并行度点进去 */ def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } /* 可以看到该方法new 了一个rdd对象 new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) 点进去 */ override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } /* 该类中有 getPartitions 这个方法; ParallelCollectionRDD.slice(data, numSlices).toArray 数据切分 点进去 */ /** * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection * is an inclusive Range, we use inclusive range for the last slice. 将集合切片为 numSlices 子集合。我们在这里做的另一件事是特别对待 Range * 集合,将切片编码为其他 Ranges 以最小化内存成本。这使得 * 在表示大量数字的 RDD 上运行 Spark 变得高效。如果集合 * 是一个包含范围,我们对最后一个切片使用包含范围。 */ def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { if (numSlices < 1) { throw new IllegalArgumentException("Positive number of partitions required") } // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } seq match { case r: Range => positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) } else { new Range(r.start + start * r.step, r.start + end * r.step, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] case nr: NumericRange[_] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) var r = nr for ((start, end) <- positions(nr.length, numSlices)) { val sliceSize = end - start slices += r.take(sliceSize).asInstanceOf[Seq[T]] r = r.drop(sliceSize) } slices case _ => val array = seq.toArray // To prevent O(n^2) operations for List etc positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq }.toSeq } } /* 该方法中有 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {} 和一个模式匹配:seq match {} 我们这里是使用的列表 case _ => val array = seq.toArray // 获取列表长度 ,分区个数 positions(array.length, numSlices).map { case (start, end) => // 开始数据拆分 array.slice(start, end).toSeq }.toSeq position(获取列表长度,分区个数) // 0 到 分区个数 不包含遍历; 可以理解为遍历分区,角标从0开始 (0 until numSlices).iterator.map { i => // 分区的编号 * 数据列表额长度/总分区数 val start = ((i * length) / numSlices).toInt // (分区的编号+1) * 数据列表额长度/总分区数 val end = (((i + 1) * length) / numSlices).toInt // 返回数据角标开始位置和结束位置。 (start, end) } */ /* // 开始数据拆分 array.slice(start, end).toSeq 进入该方法 如下面所示 slice(from: Int, until: Int) from ---> until 含头不含未式切分数据 */ def slice(from: Int, until: Int): Repr = { val lo = math.max(from, 0) val hi = math.min(math.max(until, 0), length) val elems = math.max(hi - lo, 0) val b = newBuilder b.sizeHint(elems) var i = lo while (i < hi) { b += self(i) i += 1 } b.result() }
-
总结
集合中创建的RDD分区数据划分根据 数组长度和分区个数决定
遍历分区,编号从0开始
计算每个分区数据的开始下标和结束下标
i 为分区编号,numSlices为分区个数,length为数组长度
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt根据(start ,end ) 下标含头不含尾将数据取出。放在每个分区中。
版权声明:本文为qq_34446614原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。