shuffle原理和内存溢出原因

  • Post author:
  • Post category:其他


错误异常:Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher

Redue Shuffle过程及参数:

先附上一个MR整个过程的大图:

过程:

1.EventFetcher 负责向MRAppMaster获取已经运行完的Map信息,这些信息包括Map编号和运行Map的服务器。

2.ShuffleScheduler负责调度Shuffle任务。

3.各Fetcher线程从ShuffleScheduler取任务,进行实际Map数据获取。默认5个Fetcher线程 。

参数:

mapreduce.reduce.shuffle.input.buffer.percent:

shuffle使用的内存比例,默认是0.7。Shuffle内存为总内存 * 0.7。

mapreduce.reduce.shuffle.memory.limit.percent:

单个shuffle任务能使用的内存限额,默认是0.25,即为 Shuffle内存 * 0.25。

低于此值可以输出到内存,否则输出到磁盘。

mapreduce.reduce.shuffle.merge.percent:默认值为0.9。

shuffle的数据量到Shuffle内存 * 0.9的时候,启动合并。

他的过程是最多5个Fetcher线程取拿取maptask输出的数据,Fetcher 线程获取Map的ShuffleHead信息后,通过调用merger.reserve(mapId, decompressedLength, id); 。merge返回InMemoryMapOutput或者是OnDiskMapOutput对象, 如果返回null“,则再从ShuffleScheduler取新的任务。

Fetcher线程取到数据后,进行mapOutput的commit操作,说明信息读结束,这个mapOutput可以和其他的mapOutput进行合并。内存空间分给Fetcher后,状态变为allocated,commit后变为committed,只有commit状态的内存可以merge,

如果前4个Fetcher已经使用了全部的shuffle内存的99%,第5个Fetcher取的数据接近单个shuffle任务能使用的内存限额。没有fetcher commit。这时会为第5个fetcher分配25%的内存。使分配内存达到shuffle内存的124%,内存溢出。

前面的4个fetcher已经使用shuffle内存的89,并已经commit。这时Merge不会启动。最后一个Fetcher的数据量接近单个shuffle任务能使用的内存限额。这时总shuffle使用量为114%,内存溢出。


MergeManager的reserve的处理如下:

public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,

long requestedSize,

int fetcher

) throws IOException {


if (requestedSize > maxSingleShuffleLimit) {


return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,

jobConf, mapOutputFile, fetcher, true);

}

if (usedMemory > memoryLimit) {


return null;

}

return unconditionalReserve(mapId, requestedSize, true);

}

unconditionalReserve方法如下: 增加usedMemory,返回InMemoryMapOutput对象。

private synchronized InMemoryMapOutput<K, V> unconditionalReserve(

TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {


usedMemory += requestedSize;

return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,

codec, primaryMapOutput);

}


此代码问题如下:

如果前4个Fetcher已经使用了全部的shuffle内存的99%,第5个Fetcher取的数据接近单个shuffle任务能使用的内存限额。没有fetcher commit。这时会为第5个fetcher分配25%的内存。使分配内存达到shuffle内存的124%,内存溢出。

前面的4个fetcher已经使用shuffle内存的89,并已经commit。这时Merge不会启动。最后一个Fetcher的数据量接近单个shuffle任务能使用的内存限额。这时总shuffle使用量为114%,内存溢出。

为了解决此问题,需要进行调整。目标是Shuffle内存占用总内存的比例不能超过70%,否则会出现OutOfMemoryError



方案一


保持shuffle内存0.7不变,则commit内存改为 0.75。同时修改reserve程序。

在reserve方法,如果usedMemory 小于Shuffle内存的75%总是能分配成功。

当大于75%的时候,则可能成功,也可能失败。但是当已经分配75%的时候,当这些Fetcher的任务结束commit 内存时,总能触发merge操作。merge后会释放内存。

为了提高系统效率,可以设置mapreduce.reduce.shuffle.merge.percent 为0.5,commit内存到0.5的时候,则启动merge,这时和fetcher申请内存时冲突的机会降低。即便 Reduce为2G内存,则merge时的数据量最少为: 2G * 0.7 * 0.5 为700MB。如果 Reduce 增大,则一次Merge的数量量更多。

mapreduce.reduce.shuffle.input.buffer.percent:

shuffle使用的内存比例,设置为0.7。

mapreduce.reduce.shuffle.memory.limit.percent:

单个shuffle任务能使用的内存限额,设置为0.25,即为 Shuffle内存 * 0.25。

低于此值可以输出到内存,否则输出到磁盘。

mapreduce.reduce.shuffle.merge.percent:设置为0.75。

shuffle的数据量到Shuffle内存 ** 0.75的时候,启动合并。


reserve方法改为

if (usedMemory  + requestedSize > memoryLimit) {   // 原来为  if (usedMemory  > memoryLimit) {


return null;

}


方案二


shuffle内存 比例0.6,单个shuffle最大为0.15, 则merge的内存比例不用改,reserve方法不用改. 这种方案shuffle内存分配到接近100%时,最多可以分配15%的shuffle内存。总得Shuffle内存不超过0.6 + 0.6 * 0.15 = 0.69。

在reserve方法,如果usedMemory 小于Shuffle内存的100%总是能分配成功,否则失败。但是当已经分配100%的时候,当这些Fetcher的任务结束commit 内存时,总能触发merge操作。merge后会释放内存。

mapreduce.reduce.shuffle.input.buffer.percent:

shuffle使用的内存比例0.6。

mapreduce.reduce.shuffle.memory.limit.percent:

单个shuffle任务能使用的内存限额,设置为0.15,即为 Shuffle内存 * 0.15。

低于此值可以输出到内存,否则输出到磁盘。

mapreduce.reduce.shuffle.merge.percent:设置为0.9。

shuffle的数据量到Shuffle内存 ** 0.9的时候,启动合并。



版权声明:本文为qq_16320025原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。