Yarn FairScheduler的抢占机制详解

  • Post author:
  • Post category:其他


FairScheduler是yarn最常用的调度器之一,详细了解它的概念、原理,仅仅靠阅读其官方文档,往往让人模棱两可,须知。Yarn的资源调度影响到整个集群系统的正确运行,因此,必须准确理解每个配置项的准确含义,还需要从代码入手,了解FairScheduler的隐藏在代码中的容易被忽略的逻辑。本文从概念、配置和代码层面,详细分析Yarn的资源调度策略FairScheduler。

Scheduler由ResourceManager进行管理,因此其实之后RM所在节点的队列配置文件才是有效的。


ResourceManager

中scheduler定义为

protected ResourceScheduler scheduler;

Yarn基于树的队列管理逻辑,在资源层面,无论是树的根节点(root 队列),非根节点、叶子节点,都是资源的抽象,在Yarn中,都是一个

Schedulable

,因此,无论是FSLeafQueue(队列树的叶子节点), 还是FSParentQueue(队列树的非叶子节点),或者是FSAppAttempt(FairScheduler调度器层面的应用),是实现了Schedulable的preemptContainer()方法,他们都有自己的fair share属性(资源量)、weight属性(权重)、minShare属性(最小资源量)、maxShare属性(最大资源量),priority属性(优先级)、resourceUsage属性(资源使用量属性)以及资源需求量属性(demand),从Schedulable接口的定义就可以看出来:

public interface Schedulable {
  /**
   * Name of job/queue, used for debugging as well as for breaking ties in
   * scheduling order deterministically.
   */
  public String getName();

  /**
   * Maximum number of resources required by this Schedulable. This is defined as
   * number of currently utilized resources + number of unlaunched resources (that
   * are either not yet launched or need to be speculated).
   */
  public Resource getDemand();

  /** Get the aggregate amount of resources consumed by the schedulable. */
  public Resource getResourceUsage();

  /** Minimum Resource share assigned to the schedulable. */
  public Resource getMinShare();

  /** Maximum Resource share assigned to the schedulable. */
  public Resource getMaxShare();

  /** Job/queue weight in fair sharing. */
  public ResourceWeights getWeights();

  /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
  public long getStartTime();

 /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
  public Priority getPriority();

  /** Refresh the Schedulable's demand and those of its children if any. */
  public void updateDemand();

  /**
   * Assign a container on this node if possible, and return the amount of
   * resources assigned.
   */
  public Resource assignContainer(FSSchedulerNode node);

  /**
   * Preempt a container from this Schedulable if possible.
   */
  public RMContainer preemptContainer();

  /** Get the fair share assigned to this Schedulable. */
  public Resource getFairShare();

  /** Assign a fair share to this Schedulable. */
  public void setFairShare(Resource fairShare);
}

显然,资源调度器是由ResourceManager负责的,因此,ResourceManager在启动的时候会负责读取配置文件,这里,我们配置的是

FairScheduler

      // Initialize the scheduler
      //资源分配模块,负责按照一定规则,将队列资源分配给各个应用程序,默认CapacityScheduler
      scheduler = createScheduler();
      scheduler.setRMContext(rmContext);
      addIfService(scheduler);
      rmContext.setScheduler(scheduler);

在FairScheduler初始化的时候,会产生一个叫做updateThread的deamon线程:

 private void initScheduler(Configuration conf) throws IOException {
    synchronized (this) {
      //省略
      //创建更新线程,负责监控队列的状态并伺机进行抢占
      updateThread = new UpdateThread();
      updateThread.setName("FairSchedulerUpdateThread");
      updateThread.setDaemon(true);
      //省略
  }
 private class UpdateThread extends Thread {

    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        try {
          Thread.sleep(updateInterval);
          long start = getClock().getTime();
          update();
          preemptTasksIfNecessary();
          long duration = getClock().getTime() - start;
          fsOpDurations.addUpdateThreadRunDuration(duration);
        } catch (InterruptedException ie) {
          LOG.warn("Update thread interrupted. Exiting.");
          return;
        } catch (Exception e) {
          LOG.error("Exception in fair scheduler UpdateThread", e);
        }
      }
    }
  }

这个线程负责不断计算集群需要的资源并进行抢占,计算所需资源并抢占,发生在

UpdateThread.preemptTasksIfNecessary()

方法中:

  /**
   * 检查所有缺乏资源的Scheduler, 无论它缺乏资源是因为处于minShare的时间超过了minSharePreemptionTimeout
   * 还是因为它处于fairShare的时间已经超过了fairSharePreemptionTimeout。在统计了所有Scheduler
   * 缺乏的资源并求和以后,就开始尝试进行资源抢占。
   */
  protected synchronized void preemptTasksIfNecessary() {
    if (!shouldAttemptPreemption()) { //检查集群是否允许抢占发生
      return;
    }

    long curTime = getClock().getTime();
    if (curTime - lastPreemptCheckTime < preemptionInterval) {
      return;//还没有到抢占时机,等下一次机会吧
    }
    lastPreemptCheckTime = curTime;

    //初始化抢占参数为none,即什么也不抢占
    Resource resToPreempt = Resources.clone(Resources.none());
    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
      //计算所有叶子队列需要抢占的资源,累加到资源变量resToPreempt中
      Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); 
    }
    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
        Resources.none())) { //如果需要抢占的资源大于Resources.none(),即大于0
      preemptResources(resToPreempt);//已经计算得到需要抢占多少资源,那么,下面就开始抢占了
    }
  }


shouldAttemptPreemption()

用来从

整个集群

的层次判断是否应该尝试进行资源抢占,如果整个集群的层次不满足抢占条件,当然就不可以进行抢占:

 private boolean shouldAttemptPreemption() {
    if (preemptionEnabled) {//首先检查配置文件是否打开抢占
      return (preemptionUtilizationThreshold < Math.max(
          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
          (float) rootMetrics.getAllocatedVirtualCores() /
              clusterResource.getVirtualCores()));
    }
    return false;
  }

shouldAttemptPreemption的判断标准主要有两个

  • 是否已经开启了抢占:即

    yarn.scheduler.fair.preemption

    是否配置为true
  • 整体集群资源利用率是否已经超过了

    yarn.scheduler.fair.preemption.cluster-utilization-threshold

    的配置值

如果以上条件均满足,则可以进行抢占相关的工作,包括计算需要抢占的资源,以及进行抢占。


FairSchduler.resToPreempt()

方法用来计算当前的

Schedulable

需要抢占的资源的大小,属于FairScheduler的核心方法,因此我对其进行了详细的注释:

/**
   * 计算这个队列允许抢占其它队列的资源大小。如果这个队列使用的资源低于其最小资源的时间超过了抢占超时时间,那么,
   * 应该抢占的资源量就在它当前的fair share和它的min share之间的差额。如果队列资源已经低于它的fair share
   * 的时间超过了fairSharePreemptionTimeout,那么他应该进行抢占的资源就是满足其fair share的资源总量。
   * 如果两者都发生了,则抢占两个的较多者。
   */
  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
    long minShareTimeout = sched.getMinSharePreemptionTimeout();//minSharePreemptionTimeout
    long fairShareTimeout = sched.getFairSharePreemptionTimeout();//fairSharePreemptionTimeout
    Resource resDueToMinShare = Resources.none();//因为资源低于minShare而需要抢占的资源总量
    Resource resDueToFairShare = Resources.none();//因为资源低于fairShare 而需要抢占的资源总量
    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {//时间超过minSharePreemptionTimeout,则可以判断资源是否低于minShare
         //选取sched.getMinShare()和sched.getDemand()中的较小值,demand代表队列资源需求量,即处于等待或者运行状态下的应用程序尚需的资源量
      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
          sched.getMinShare(), sched.getDemand());
      //选取Resources.none(即0)和 Resources.subtract(target, sched.getResourceUsage())中的较大值,即
      //如果最小资源需求量大于资源使用量,则取其差额,否则,取0,代表minShare已经满足条件,无需进行抢占
      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
    }

    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {// //时间超过fairSharePreemptionTimeout,则可以判断资源是否低于fairShare
        //选取sched.getFairShare()和sched.getDemand()中的较小值,demand代表队列资源需求量,即处于等待或者运行状态下的应用程序尚需的资源量
        //如果需要2G资源,当前的fairshare是2.5G,则需要2.5G
      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
          sched.getFairShare(), sched.getDemand());

      //选取Resources.none(即0)和 Resources.subtract(target, sched.getResourceUsage())中的较大值,即
      //如果fair share需求量大于资源使用量,则取其差额,否则,取0,代表minShare已经满足条件,无需进行抢占
      //再拿2.5G和当前系统已经使用的资源做比较,如果2.5G-usedResource<0, 则使用Resources.none(),即不需要抢占
      //否则,抢占资源量为2.5G-usedResource<0
      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
    }
    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
        resDueToMinShare, resDueToFairShare);
    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
        resToPreempt, Resources.none())) {
      String message = "Should preempt " + resToPreempt + " res for queue "
          + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
          + ", resDueToFairShare = " + resDueToFairShare;
      LOG.info(message);
    }
    return resToPreempt;
  }

根据Yarn的设计,由于资源抢占本身是一种资源的强行剥夺,会带来一定的系统开销。因此,Yarn会在实际抢占发生前,耐心等待一段时间,以尽量直接使用其它应用释放的资源来使用,而尽量避免使用抢占的方式。我叫它

懒抢占


因此,我们在FairScheduler.xml中,需要配置这两个超时时间:


  • minSharePreemptionTimeout

    表示如果超过该指定时间,Scheduler还没有获得minShare的资源,则进行抢占

  • fairSharePreemptionTimeout

    表示如果超过该指定时间,Scheduler还没有获得fairShare的资源,则进行抢占

我们从

resToPreempt()

的代码中可以清楚看到这个抢占规则。

至于什么叫做fairShare以及fairShare的计算方式,可以参考我的另外一篇博文

《Yarn FairSheduler使用FairSharePolicy计算Fair Share的规则、原理和代码实现》

在计算完整体需要进行抢占的资源,下面就可以进行资源抢占了。在这里,请读者务必仔细阅读Yarn关于FairScheduler的官方文档,必须清楚FairScheduler内部定义了不同的Policy决定进行资源抢占的方式,包括fair policy, drf policy以及 fifo policy。默认,是fair policy。

/**
   * 基于已经计算好的需要抢占的资源(toPreempt()方法)进行资源抢占。每一轮抢占,我们从root 队列开始,
   * 一级一级往下进行,直到我们选择了一个候选的application.当然,抢占分优先级进行。
   * 依据每一个队列的policy,抢占方式有所不同。对于fair policy或者drf policy, 会选择超过
   * fair share(这里的fair scheduler都是指Instantaneous Fair Share)
   * 最多的ChildSchedulable进行抢占,但是,如果是fifo policy,则选择最后执行的application进行
   * 抢占。当然,同一个application往往含有多个container,因此同一个application内部container
   * 的抢占也分优先级。
   */
  protected void preemptResources(Resource toPreempt) {
    long start = getClock().getTime();
    if (Resources.equals(toPreempt, Resources.none())) {
      return;
    }
     //warnedContainers,被警告的container,即在前面某轮抢占中被认为满足被强占条件的container 同样,yarn发现一个container满足被抢占规则,绝对不是立刻抢占,而是等待一个超时时间,试图让app自动释放这个container,如果到了超时时间还是没有,那么就可以直接kill了
    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
    //toPreempt代表依旧需要进行抢占的资源
    while (warnedIter.hasNext()) {
      RMContainer container = warnedIter.next();
      if ((container.getState() == RMContainerState.RUNNING ||
              container.getState() == RMContainerState.ALLOCATED) &&
          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
              toPreempt, Resources.none())) {
        warnOrKillContainer(container);
        Resources.subtractFrom(toPreempt, container.getContainer().getResource());//抢占到了一个container,则从toPreempt中去掉这个资源
      } else {
        warnedIter.remove();
      }
    }

    try {
      // Reset preemptedResource for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.resetPreemptedResources();
      }

      //toPreempt代表了目前仍需要抢占的资源,通过不断循环,一轮一轮抢占,toPreempt逐渐减小
      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
          toPreempt, Resources.none())) { //只要还没有达到抢占要求
        RMContainer container =
            getQueueManager().getRootQueue().preemptContainer();
        if (container == null) {
          break;
        } else {
        //找到了一个待抢占的container,同样,警告或者杀死这个container
          warnOrKillContainer(container);
          warnedContainers.add(container);
          //重新计算剩余需要抢占的资源
          Resources.subtractFrom(
              toPreempt, container.getContainer().getResource());
        }
      }
    } finally {
      // Clear preemptedResources for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.clearPreemptedResources();
      }
    }

    long duration = getClock().getTime() - start;
    fsOpDurations.addPreemptCallDuration(duration);
  }

每一轮抢占,都会通过方法

warnOrKillContainer

来检查并处理所有的warnedContainers。

  protected void warnOrKillContainer(RMContainer container) {
    ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
    FSAppAttempt app = getSchedulerApp(appAttemptId);
    FSLeafQueue queue = app.getQueue();
    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
        "res=" + container.getContainer().getResource() +
        ") from queue " + queue.getName());

    Long time = app.getContainerPreemptionTime(container);

    if (time != null) {
      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
      // proceed with kill
      //如果这个container在以前已经被标记为需要被抢占,并且时间已经超过了maxWaitTimeBeforeKill,那么这个container可以直接杀死了
      if (time + waitTimeBeforeKill < getClock().getTime()) {
        ContainerStatus status =
          SchedulerUtils.createPreemptedContainerStatus(
            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);

        // TODO: Not sure if this ever actually adds this to the list of cleanup
        // containers on the RMNode (see SchedulerNode.releaseContainer()).
        completedContainer(container, status, RMContainerEventType.KILL); //执行清理工作
        LOG.info("Killing container" + container +
            " (after waiting for premption for " +
            (getClock().getTime() - time) + "ms)");
      }
    } else {
        //把这个container标记为可能被抢占,也就是所谓的container警告,在下一轮或者几轮,都会拿出这个container判断是否超过了maxWaitTimeBeforeKill,如果超过了,则可以直接杀死了。
      // track the request in the FSAppAttempt itself
      app.addPreemption(container, getClock().getTime());
    }
  }

从方法名称可以看到,结果有两种:

– 杀死:如果这个container之前已经被标记为

待抢占

,并且距离标记时间已经超过了waitTimeBeforeKill却依然没有被自己的ApplicationMaster主动释放的container(太不自觉),如果是,那么既然在waitTimeBeforeKill之前已经向其主人(ApplicationMaster)发出警告,那么现在FairScheduler失去了耐心,直接杀死这个Container。

– 死期未到:如果这个Container之前已经被标记为抢占,但是距离标记时间还不到waitTimeBeforeKill,那么此次侥幸逃脱,下次再进行判断

– 标记和警告:如果这个container还从来没有被标记为

待抢占

,那么这次就进行标记,记录标记时间,下次updateThread到来,这个container会历经被杀死或者暂时死期未到。


completedContainer(container, status, RMContainerEventType.KILL);

是一个典型的状态机过程,当前发生的事件是

RMContainerEventType.KILL

,即发生

kill

事件,然后

ResourceManager

端的

container

实现

RMContainerImpl

会根据自己的当前状态以及发生的

kill

事件,得出目标状态;关于Hadoop状态机的实现可以参考我的另外一篇博客《Hadoop状态机-高度可重用、可扩展》(建设中),具体的

RMContainerImpl

的状态转换可以看这个类的具体代码。

如果warnedContainer被抢占来的资源依然小于

toPreempt

,那就只好从队列里面选择某些container来抢占,抢占规则,就是队列具体定义的Policy。这段逻辑在

preemptResources()

方法的这段代码里:

  try {
      // Reset preemptedResource for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.resetPreemptedResources();
      }

      //toPreempt代表了目前仍需要抢占的资源,通过不断循环,一轮一轮抢占,toPreempt逐渐减小
      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
          toPreempt, Resources.none())) { //只要还没有达到抢占要求
          //通过具体队列的Policy要求,选择一个container用来被抢占
        RMContainer container =
            getQueueManager().getRootQueue().preemptContainer();  
        if (container == null) {
          break;
        } else {
          warnOrKillContainer(container);
          //将这个container加入到警告列表,以后每一轮都会检查它是否被释放或者抢占,如果超过了一定时间还是没有被抢占或者主动释放,就可以直接kill并抢占了
          warnedContainers.add(container);
          Resources.subtractFrom(
              toPreempt, container.getContainer().getResource());
        }
      }
    } finally {
      // Clear preemptedResources for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.clearPreemptedResources();
      }
    }

我们从

RMContainer container =
            getQueueManager().getRootQueue().preemptContainer();  

看进去,看看具体的Policy是如何决定选择哪个container进行抢占(执行死刑)的。我们选择FairScheduler默认的Policy

FairSharePolicy

来进行分析。分析的过程是,实际上是沿着队列树按照深度优先,逐渐往下遍历直至找到一个container用来抢占。下图示意了yarn的队列树中的层级关系:

Yarn队列中的实体层级
graph TD
A[FSParentQueue] --> B[FSLeafQueue] 
B--> C[Application]
C --> D[Container]
FSParentQueue.preemptContainer()
  /**
   * 从root queue开始,找出一个可以被抢占的container进行抢占。
   * 决策和遍历过程实际上是一个递归调用的过程,从root queue开始,不断
   * 由下级队列决定抢占自己下一级的哪个queue或者application或者container
   * 最终,是由LeafQueue选择一个Application,然后Application选择一个
   * Container
   */
  @Override
  public RMContainer preemptContainer() {
    RMContainer toBePreempted = null;

    // Find the childQueue which is most over fair share
    FSQueue candidateQueue = null;
    Comparator<Schedulable> comparator = policy.getComparator();
    //从自己所有的子队列中选择一个最应该被抢占的队列
    for (FSQueue queue : childQueues) {
      if (candidateQueue == null ||
          comparator.compare(queue, candidateQueue) > 0) {
        candidateQueue = queue;
      }
    }

    // Let the selected queue choose which of its container to preempt
    //选择出来了一个待抢占的队列以后,让这个队列自行决定抢占哪个container,采用**递归**调用的方式
    if (candidateQueue != null) {
      toBePreempted = candidateQueue.preemptContainer();
    }
    return toBePreempted;
  }



FSParentQueue.preemptContainer()

的递归方式来看,寻找被抢占的container的过程,是从队列树的root queue开始,采用

深度优先

的方式进行!

FairSharePolicy使用的资源比较器是

DefaultResourceCalculator

,从DefaultResourceCalculator中不难看出,进行资源大小的比较时,只考虑了memory,没有考虑vCore。

因此,我们来看

FSLeafQueue.preemptContainer()

,LeafQueue的意思是,下面没有子队列。

FSLeafQueue.preemptContainer()
@Override
  public RMContainer preemptContainer() {
    RMContainer toBePreempted = null;

    // If this queue is not over its fair share, reject
    if (!preemptContainerPreCheck()) {
      return toBePreempted;
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Queue " + getName() + " is going to preempt a container " +
          "from its applications.");
    }

    // Choose the app that is most over fair share
    Comparator<Schedulable> comparator = policy.getComparator();
    FSAppAttempt candidateSched = null;
    readLock.lock();
    try {
      //从该叶子队列中的所有application中,选择一个更应该被强占的application
      //如果使用默认Policy FairSharePolicy,那么选择标准就是该Application当前资源
      //的欠缺或者充裕程度,资源越充裕,越可能被选中
      for (FSAppAttempt sched : runnableApps) {
        if (candidateSched == null ||
            comparator.compare(sched, candidateSched) > 0) {
          candidateSched = sched;
        }
      }
    } finally {
      readLock.unlock();
    }

    // Preempt from the selected app
    if (candidateSched != null) {
      //由于是叶子队列,因此candidateSched肯定是一个APP,即FSAppAttempt对象
      toBePreempted = candidateSched.preemptContainer();
    }
    return toBePreempted;
  }

FSLeafQueue与FSParentQueue的抢占逻辑是几乎相同的,都是通过递归遍历进行深度优先遍历,唯一的区别,就是FSParentQueue的child是FSParentQueue或者FSLeafQueue,而FSLeafQueue的child是FSAppAttemtp。因此,最重要的,我们来看FSAppAttempt.preemptContainer():

FSAppAttempt.preemptContainer()
  /**
   * 根据优先级,从application的所有container中选择一个container用来被抢占
   */
  @Override
  public RMContainer preemptContainer() {
    //省略
    RMContainer toBePreempted = null;
    //获取自己所有的running container
    for (RMContainer container : getLiveContainers()) {
    //使用比较器RMContainerComparator选择出一个最应该被抢占的container
      if (!getPreemptionContainers().contains(container) &&
          (toBePreempted == null ||
              comparator.compare(toBePreempted, container) > 0)) {
        toBePreempted = container;
      }
    }
    return toBePreempted;
  }
}

FSAppAttempt用来决定自己的哪个container拿出来被抢占,采用的是比较器

RMContainerComparator

,这个比较器代码简单,贴出来:

  static class RMContainerComparator implements Comparator<RMContainer>,
      Serializable {
    @Override
    public int compare(RMContainer c1, RMContainer c2) {
      int ret = c1.getContainer().getPriority().compareTo(
          c2.getContainer().getPriority());
      if (ret == 0) {
        return c2.getContainerId().compareTo(c1.getContainerId());
      }
      return ret;
    }
  }

可见,规则就是比较优先级,选择一个优先级较低的container,如果优先级相同,则比较containerId并选择一个id比较小的container。

在这里我们可以看到Yarn队列设计的精妙之处。无论是parentQueue , 还是leaf queue ,或者是Application,虽然处在一个tree的不同level,但是他们的性质是一样的,都被抽象为

Schedulable

,因此都需要实现

preemptContainer()

方法。在决定哪个container被抢占的时候,就可以

递归

进行,ParentQueue交给下面的leaf Queue 或者 下面的ParentQueue决定,而LeafQueue则交给下面的Application决定,Applactiion则根据container的优先级,决定哪个container被抢占。

从以上代码的整体逻辑可以看到,yarn进行资源抢占,在计算需要抢占多少资源的时候,是

从整个yarn集群的范围内

进行计算的,而

不是为了满足某一个application的资源而为了它进行单独的抢占

Yarn如果配置了资源抢占,很容易发生长任务被饿死的情况。我之前遇到的情况,在配置资源抢占之前,所有队列的maxResource之和等于集群总资源,且不允许抢占。显然,在这种配置情况下,集群总体资源利用率不高,即使是在半夜最繁忙(许多定时任务用来统计前一天的业务数据,因此半夜会比较集中地运行)的时候,由于存在几个队列没有任务运行,集群资源利用率最高也只能达70%。于是,我们打开抢占,目的是提高集群整体资源利用率,预期结果是大部分任务的执行时间缩短。的确,抢占打开以后,集群整体资源利用率能够升至90%以上。但是,第二天过来,发现大量任务尤其是长任务出现严重delay。集群整体资源利用率的确提高,为什么大量任务延迟呢?

为了解决问题,我们尝试了一下办法:

考虑到有可能过度并行导致此问题,我们对每个队列中application的并行程度进行限制:


  • 限制ApplicationMaster资源占比

    :对每个队列的amShare参数进行调整,目的是为了限制每个队列运行的应用数目

  • 限制队列可运行应用数目

    :直接通过maxRunningApps限制队列可运行应用数目

  • 排除内存资源问题

    :由于我们的Yarn集群同时运行了hdfs、zookeeper、kafka等组件,我们给Yarn的每一个NodeManager配置的内存为物理内存的80%,即留下20%的物理内存给操作系统本身以及其他组件运行。我们怀疑到了晚上某些时刻其他组件占用过多内存,导致Yarn运行某些container的之后无法正常向操作系统申请内存。通过zabbix观察剩余内存,排除该原因。

  • 排除CPU以及IO问题

    :由于我们使用基于Fair Policy的FairScheduler,资源调度只考虑到内存,因此,我们怀疑打开抢占以后,由于系统运行的app变多,内存之外的资源,比如CPU和IO制约应用运行。同样,我们通过zabbix进行观察,虽然在最繁忙时段系统CPU和IO负载的确升高,但是总体并未出现严重延迟。

最终,我们选取了某一个spark批处理任务进行详细分析,发现问题:该任务的多个task,均多次出现执行过程中被抢占。一个正常执行时间为30分钟的任务,前四次均执行到15分钟的时候被抢占和kill掉,最后一次成功,因此执行时间达到90分钟。因此,取消抢占,最终问题解决。因此,在进行yarn的队列资源配置的时候,一定要将运行时间较长的任务(例如spark streaming任务、或者一个task的执行时间可能比较产)和执行时间比较短的任务放在不同的队列中。同时,慎重使用抢占。其实,Yarn的队列配置对minResource有硬性限制,即所有队列的minResource之和不可以超过集群总资源,但是,没有哪里告诉我们说maxResource之和必须等于或者小于集群资源之和。只要我们合理配置maxResource,即使不打开抢占,在适当并行情况下,ResourceManager完全可以将一个queue的已经运行完成的container回收然后分配给另外一个queue,只不过,在任何情况下,任何队列的资源使用都不会超过maxResource。



版权声明:本文为zhanyuanlin原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。