MapReduce merge机制
概述
在map端和reduce端都会有merge过程,将segments进行多路归并成一个大的segment。在map端,一个spill-N.out文件的每个partition在merge阶段使用一个segment代表。
merge过程
粗略过程:
从segments中每次remove出mergeFactor个segment进行归并,归并为一个大的segment,结束后将该segment插入segments中,继续该过程,直到segments.size<=mergeFactor,对其进行最后一个多路merge即可。
详细过程:
-
读取设置的mergeFactor(
mapreduce.task.io.sort.factor
),默认是
10
。mergeFactor表示多路归并的路数,即每一次将mergeFactor个segment归并。 - 构造segment集合。例如:在map端会为每一个spill-N.out文件的当前待合并的partition,构造一个segment,加入到segments(list<segment>)中。
-
如果segments.size>mergeFactor,会对segments进行排序(
按segmentLength升序
),先归并length小的segment可以减少归并总的比较次数。如果不满足,则不需要排序(因为一次多路merge可以处理segments集合里的所有segment,所以次序不影响)。 -
构造一个PriorityQueue,逻辑结构是
小顶堆
,物理结构是
数组
(下标从1开始,方便表示父子关系)。在segments中remove出最靠前的mergeFactor个segment,put到该PriorityQueue;一个segment的大小通过其当前待处理的(key/value)的key大小决定。 - 对上述的mergeFactor个segment进行归并。
- 通过PriorityQueue选出minsegment,然后将其(key/value)输出到Intermediate-passNo.out文件中;
- 输出后如果minsegment没有下一个(key/value),则将其弹出;
- 调整PriorityQueue。
- 重复进行上述过程,最终Intermediate-passNo.out将保存着该次多路merge的结果。
- 为Intermediate-passNo.out设置一个代表它的segment加入到segments中(加入后还是按segmentLength将其放到合适位置),然后重新进行上述过程,直到segments.size<mergeFactor;返回PriorityQueue。
- 用返回的进行一次多路merge,并写出到目标文件即可。
Merger代码流程
PriorityQueue进行多路归并
Bug
如下代码有一个bug,当factor设置为1,且初始的numSegments>1,inMem=0时,会使得while死循环。
可以通过设置configuration.set(“mapreduce.task.io.sort.factor”,“1”),就会很容易产生该bug(虽然将factor设置为1没有什么意义,也不会这么做)。
//Merger$MergeQueue#computeBytesInMerges(int factor, int inMem)
long computeBytesInMerges(int factor=1, int inMem) {
int numSegments = segments.size();
int n = numSegments - inMem; //>1
// factor for 1st pass
int f = getPassFactor(factor, 1, n) + inMem// =1
n = numSegments //>1
// If includeFinalMerge is true, allow the following while loop iterate
// for 1 more iteration. This is to include final merge as part of the
// computation of expected input bytes of merges
boolean considerFinalMerge = includeFinalMerge;
while (n > f || considerFinalMerge) {
if (n <=f ) {
considerFinalMerge = false;
}
long mergedSize = 0;
f = Math.min(f, segmentSizes.size());
for (int j = 0; j < f; j++) {
mergedSize += segmentSizes.remove(0);
}
totalBytes += mergedSize;
// insert new size into the sorted list
int pos = Collections.binarySearch(segmentSizes, mergedSize);
if (pos < 0) {
pos = -pos-1;
}
segmentSizes.add(pos, mergedSize);
//产生原因当f=1是,n不会减小(n-=0)。
//因为最初n>f,所以n>f将一直成立。循环不会退出。
n -= (f-1);
f = factor;
}
return totalBytes;
}
版权声明:本文为weixin_43955361原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。