浅析Yarn中的关键概念-Container

  • Post author:
  • Post category:其他


初学Yarn的时候,对于Container的概念感觉非常陌生,即便是后期用了很长时间的Yarn平台,依旧觉得对于Container这个概念没有达到非常熟悉的程度:

本文,从源码的角度上来说说,到底什么是Container:

说起来非常容易,Container就是Yarn中的一个动态资源分配的概念,其拥有一定的内存,核数,由RM分配给ApplicationMaster或者MapTask或者ReduceTask,而后,我们需要的程序就在Container为基础的容器中运行起来;

概念非常容易,看了就懂,那么问题来了:


1:Container说是动态资源分配,那么,究竟是怎么个动态分配的法子?


2:MRV1中Map Slot与Reduce Slot的资源往往是不同的,那么,我们的Container在作为Map Task或者Reduce Task运行容器的时候,资源会是一样的吗?


3:对于Map Task或者Reduce Task来说,其究竟是如何从RM处拿到自己需要用的资源的?

我们带着这三个问题,去深入研究一下Yarn内部Container的概念:


这里,我搭建了一个简单的Yarn集群,可以看到,整个集群占用了48GB的内存,占用了48个核:默认情况下,每个NodeManager会提供出8G8核的内存,来供给集群使用:

6个节点的集群,就是如上的内存和机器核数:


我们看下这个,是针对于单个Map Task,针对单个Reduce Task,针对ApplicationMaster可以分配的最大内存,就是无论如何,都不能超过这个大小:分配的Container,最多只能分配这么多资源;同时,也指定了最小资源的范围:

这里,我们可以看到,的确,Container是一个动态的概念,Yarn给其设定了一定的范围,但是对于Task和ApplicationMaster来说,可以自己指定申请资源的大小,只要在Yarn设置的合理范围内均可:

我们先看下,整个集群的初始资源是从何而来:


重点:NodeStatusUpdaterImpl

这是个很重要的类,在NodeManager内部,负责NodeManager的资源报告,我们看下:

int memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB,
				YarnConfiguration.DEFAULT_NM_PMEM_MB);
		float vMemToPMem = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
				YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
		int virtualMemoryMb = (int) Math.ceil(memoryMb * vMemToPMem);

		int virtualCores = conf.getInt(YarnConfiguration.NM_VCORES,
				YarnConfiguration.DEFAULT_NM_VCORES);

		this.totalResource = Resource.newInstance(memoryMb, virtualCores);

从ServiceInit中抄出来的方法,其负责向RM来汇报自己所拥有的资源,我们可以看到,默认情况下,一个NodeManager会贡献出8G8核的资源,这就是集群启动之初的资源来源:

同样,我们也可以修改自己的配置:

public static final String NM_PREFIX = "yarn.nodemanager.";
/** Amount of memory in GB that can be allocated for containers. */
	public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
	public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;

通过该默认配置,我们可以给Yarn集群提供更大的资源,修改该配置,是针对所有的NodeManager资源配置的:

/** Number of Virtual CPU Cores which can be allocated for containers. */
	public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
	public static final int DEFAULT_NM_VCORES = 8;

同样的,通过该配置修改,也可以提高每个NM贡献给整个集群的核数目:


这里,我们明确了一个概念,集群运行之初,每个NM交付了自己可以贡献出来的资源总量,这个资源总量也是可以配置的。

接着,我们看另外一个问题:Map Task与Reduce Task可以使用的资源配置,牵涉到该问题的源码在TaskAttemptImpl中:

private int getMemoryRequired(Configuration conf, TaskType taskType) {
		int memory = 1024;
		if (taskType == TaskType.MAP) {
			memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB,
					MRJobConfig.DEFAULT_MAP_MEMORY_MB);
		} else if (taskType == TaskType.REDUCE) {
			memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
					MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
		}

		return memory;
	}

	private int getCpuRequired(Configuration conf, TaskType taskType) {
		int vcores = 1;
		if (taskType == TaskType.MAP) {
			vcores = conf.getInt(MRJobConfig.MAP_CPU_VCORES,
					MRJobConfig.DEFAULT_MAP_CPU_VCORES);
		} else if (taskType == TaskType.REDUCE) {
			vcores = conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
					MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
		}

		return vcores;
	}

可以看出来,对于MapTask和ReduceTask,沿用了MRV1中的类Slot的机制,但是变得更加灵活,无需事先确定好Map Slot和Reduce Slot的大小,而是可以使用中灵活配置,当然,目前从源码角度上看,集群启动之后,Map Task和Reduce Task拥有的资源量就是固定的了:

接着,我们看看Container的源码,寻找一些蛛丝马迹:

The <code>ResourceManager</code> is the sole authority to allocate any
 * <code>Container</code> to applications. The allocated <code>Container</code>
 * is always on a single node and has a unique {@link ContainerId}. It has a
 * specific amount of {@link Resource} allocated.

在Contaienr的注释中可以看到,Container只能位于一个节点上,因此,其最大资源也就是一个NodeManager的资源大小,这也符合我们在Yarn的web页面看到的资源量的最大和最小概念:

这也很好理解,就Map Task而言,我们可以把任务的Block分配更小,从而降低Map Task运行所需的内存,这比较容易;但是如果想要更大的内存,甚至是一个Container来囊括两个NM甚至是更多NM的资源,是并不容易的:

public static Container newInstance(ContainerId containerId, NodeId nodeId,
			String nodeHttpAddress, Resource resource, Priority priority,
			Token containerToken) {
		Container container = Records.newRecord(Container.class);
		container.setId(containerId);
		container.setNodeId(nodeId);
		container.setNodeHttpAddress(nodeHttpAddress);
		container.setResource(resource);
		container.setPriority(priority);
		container.setContainerToken(containerToken);
		return container;
	}

我们看下这里,通过Container的成员变量来更深刻把握这个概念:

  1. Id:在整个集群中,Id是全局唯一的
  2. NodeId:这也符合了我们的分析,这是NodeManager的ID
  3. Resource:资源量的大小,也就是这次的Container具体会占用多少的资源:
  4. Priority:任务执行的优先性,即本Container优先分配给哪个任务来运行
  5. ContainerToken:使用Container之前,必须拥有该Container的token标记:

我们这里,多看下Resource这个类:

 public static Resource newInstance(int memory, int vCores) {
    Resource resource = Records.newRecord(Resource.class);
    resource.setMemory(memory);
    resource.setVirtualCores(vCores);
    return resource;
  }

很清楚,一个资源实体,概括了Memory和core:

对于Container的概念,理解到该层面就可以了,接下来我们看下,到底Container是如何分配给ApplicationMaster和Map Task以及Reduce Task来运行的?

不过多的分析,我们单独分析下ApplicationMaster对于资源的获取,我们可以从JobImpl出发来看下:

相关代码在该类的makeUberDecision内:

long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
				MRJobConfig.DEFAULT_MR_AM_VMEM_MB);

		long sysCPUSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
				MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);

这里,我们可以看到,默认情况下,给予ApplicationMaster的内存大小是1.5G,给AM默认的核数是1:

介绍到这里,大家大致可以明白了:

说白了,Container只是一个动态资源的概念,其实际的大小,是可以配置的,使用过程中,可以根据任务的不同类型来修改资源用量:



版权声明:本文为u013384984原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。