MapReduce merge机制

  • Post author:
  • Post category:其他




MapReduce merge机制



概述

在map端和reduce端都会有merge过程,将segments进行多路归并成一个大的segment。在map端,一个spill-N.out文件的每个partition在merge阶段使用一个segment代表。



merge过程



粗略过程:

从segments中每次remove出mergeFactor个segment进行归并,归并为一个大的segment,结束后将该segment插入segments中,继续该过程,直到segments.size<=mergeFactor,对其进行最后一个多路merge即可。

在这里插入图片描述



详细过程:
  • 读取设置的mergeFactor(


    mapreduce.task.io.sort.factor


    ),默认是

    10

    。mergeFactor表示多路归并的路数,即每一次将mergeFactor个segment归并。
  • 构造segment集合。例如:在map端会为每一个spill-N.out文件的当前待合并的partition,构造一个segment,加入到segments(list<segment>)中。
  • 如果segments.size>mergeFactor,会对segments进行排序(

    按segmentLength升序

    ),先归并length小的segment可以减少归并总的比较次数。如果不满足,则不需要排序(因为一次多路merge可以处理segments集合里的所有segment,所以次序不影响)。
  • 构造一个PriorityQueue,逻辑结构是

    小顶堆

    ,物理结构是

    数组

    (下标从1开始,方便表示父子关系)。在segments中remove出最靠前的mergeFactor个segment,put到该PriorityQueue;一个segment的大小通过其当前待处理的(key/value)的key大小决定。
  • 对上述的mergeFactor个segment进行归并。
  1. ​ 通过PriorityQueue选出minsegment,然后将其(key/value)输出到Intermediate-passNo.out文件中;
  2. ​ 输出后如果minsegment没有下一个(key/value),则将其弹出;
  3. ​ 调整PriorityQueue。
  4. ​ 重复进行上述过程,最终Intermediate-passNo.out将保存着该次多路merge的结果。
  • 为Intermediate-passNo.out设置一个代表它的segment加入到segments中(加入后还是按segmentLength将其放到合适位置),然后重新进行上述过程,直到segments.size<mergeFactor;返回PriorityQueue。
  • 用返回的进行一次多路merge,并写出到目标文件即可。



Merger代码流程

在这里插入图片描述



PriorityQueue进行多路归并

在这里插入图片描述



Bug

如下代码有一个bug,当factor设置为1,且初始的numSegments>1,inMem=0时,会使得while死循环。

可以通过设置configuration.set(“mapreduce.task.io.sort.factor”,“1”),就会很容易产生该bug(虽然将factor设置为1没有什么意义,也不会这么做)。

//Merger$MergeQueue#computeBytesInMerges(int factor, int inMem)
long computeBytesInMerges(int factor=1, int inMem) {
   int numSegments = segments.size();
   int n = numSegments - inMem; //>1
      // factor for 1st pass
   int f = getPassFactor(factor, 1, n) + inMem// =1
   n = numSegments  //>1
  // If includeFinalMerge is true, allow the following while loop iterate
  // for 1 more iteration. This is to include final merge as part of the
  // computation of expected input bytes of merges
  boolean considerFinalMerge = includeFinalMerge;
  while (n > f || considerFinalMerge) {
    if (n <=f ) {
      considerFinalMerge = false;
    }
    long mergedSize = 0;
    f = Math.min(f, segmentSizes.size());
    for (int j = 0; j < f; j++) {
      mergedSize += segmentSizes.remove(0);
    }
    totalBytes += mergedSize;
    
    // insert new size into the sorted list
    int pos = Collections.binarySearch(segmentSizes, mergedSize);
    if (pos < 0) {
      pos = -pos-1;
    }
    segmentSizes.add(pos, mergedSize);
    //产生原因当f=1是,n不会减小(n-=0)。
    //因为最初n>f,所以n>f将一直成立。循环不会退出。
    n -= (f-1);
    f = factor;
  }

  return totalBytes;
}



版权声明:本文为weixin_43955361原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。