Apache Hadoop YARN:Client与ResourceManager交互源码解析

  • Post author:
  • Post category:其他



本文主要解析Client<–>ResourceManager之间接口(方法)调用过程(不涉及RPC细节)。


“通用”YARN应用涉及的角色及交互:

RM:ResourceManager

AM:ApplicationMaster

NM:NodeManager


交互中用到的主要通信协议:

ApplicationClientProtocol

ApplicationMasterProtocol

ContainerManagementProtocol


客户端程序与RM进行交互,

Client<–>ResourceManager,通过YarnClient对象来实现。


AM与RM进行交互,

ApplicationMaster<–>ResourceManager,


通过AMRMClientAsync对象来实现,

通过AMRMClientAsync.CallbackHandler异步处理事件信息。


AM与NM进行交互,

ApplicationMaster<–>NodeManager,

通过NMClientAsync对象来实现,主要是启动Container,

通过NMClientAsync.CallbackHandler异步处理Container事件。


接口请求和响应的proto message定义:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。


Hadoop版本3.2.1


Flink版本1.10



0、 关键接口和实现类调用关系图


一、 比如Flink集群发布到YARN上的YARN集群描述信息如下

// 起始点是FlinkYarnSessionCli#main
public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);

    private final YarnConfiguration yarnConfiguration;

    // YARN客户端对象YarnClient,此对象的创建位置见后续Flink源码分析
    private final YarnClient yarnClient;
    ...
}



二、 YarnClient部分源码


package org.apache.hadoop.yarn.client.api;

import ...

// 抽象类YarnClient,实现类YarnClientImpl
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {

  // 创建一个新的YarnClient实例
  @Public
  public static YarnClient createYarnClient() {
    // YarnClient的具体实现YarnClientImpl
    YarnClient client = new YarnClientImpl();
    return client;
  }

  @Private
  protected YarnClient(String name) {
    super(name);
  }

  // 创建应用,返回YarnClientApplication对象,主要是里面的ApplicationSubmissionContext和GetNewApplicationResponse两个对象
  public abstract YarnClientApplication createApplication()
      throws YarnException, IOException;

  // 向YARN提交应用,阻塞调用,直到提交成功、被RM接受之后才会返回ApplicationId
  // 这里直接提交的是ApplicationSubmissionContext对象
  public abstract ApplicationId submitApplication(
      ApplicationSubmissionContext appContext) throws YarnException,
      IOException;

  // kill掉指定applicationId的应用
  // 比如 yarn application -kill xxx-xxxx-1
  public abstract void killApplication(ApplicationId applicationId) throws YarnException,
      IOException;

  // 获取指定applicationId的应用报告信息(ApplicationReport有约19个成员)
  // WEB-UI页面能够看到每个应用的相关信息;proto message: ApplicationReportProto
  public abstract ApplicationReport getApplicationReport(ApplicationId appId)
      throws YarnException, IOException;

  // 获取应用的AMRMToken,此Token用于保证AM约RM之间的通信安全
  public abstract org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
      getAMRMToken(ApplicationId appId) throws YarnException, IOException;

  // 获取YARN集群中所有应用的应用报告信息
  public abstract List<ApplicationReport> getApplications()
      throws YarnException, IOException;

  // 根据应用类型获取应用报告信息,比如类型:Apache Flink 或者 MAPREDUCE
  // 具体实现(RM的“服务端”)见ClientRMService里的getApplications方法
  public abstract List<ApplicationReport> getApplications(
      Set<String> applicationTypes) throws YarnException, IOException;

  // 根据一个或多个应用状态获取集群中应用的应用报告信息
  // NEW NEW_SAVING SUBMITTED ACCEPTED RUNNING FINISHED FAILED KILLED
  public abstract List<ApplicationReport>
      getApplications(EnumSet<YarnApplicationState> applicationStates)
          throws YarnException, IOException;

  // 根据应用类型和应用状态获取应用的报告信息
  public abstract List<ApplicationReport> getApplications(
      Set<String> applicationTypes,
      EnumSet<YarnApplicationState> applicationStates) throws YarnException,
      IOException;

  // 条件:调度队列、用户、应用类型、应用状态
  public abstract List<ApplicationReport> getApplications(Set<String> queues,
      Set<String> users, Set<String> applicationTypes,
      EnumSet<YarnApplicationState> applicationStates) throws YarnException,
      IOException;

  // 获取YARN集群的(健康等)指标数据
  public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
      IOException;

  // 获取集群节点的报告信息
  // 具体实现(RM的“服务端”)见ClientRMService里的getClusterNodes方法
  public abstract List<NodeReport> getNodeReports(NodeState... states)
      throws YarnException, IOException;

  // 获取代理Token用于与YARN进行安全通信
  // 具体实现(RM的“服务端”)见ClientRMService里的getDelegationToken方法
  public abstract Token getRMDelegationToken(Text renewer)
      throws YarnException, IOException;

  // 获取指定队列名称的队列信息(信息封装在QueueInfo里面,scheduler里面获取)
  // QueueInfoProto
  public abstract QueueInfo getQueueInfo(String queueName) throws YarnException,
      IOException;

  // 获取全部队列
  public abstract List<QueueInfo> getAllQueues() throws YarnException, IOException;

  // 获取root(根)队列的信息
  public abstract List<QueueInfo> getRootQueueInfos() throws YarnException, IOException;

  // 获取指定队列名称下的子队列信息
  public abstract List<QueueInfo> getChildQueueInfos(String parent) throws YarnException,
      IOException;

  // 获取队列的ACL权限信息
  public abstract List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
      IOException;

  // 获取指定应用尝试ID对应ApplicationAttempt的报告信息【翻译真别扭】
  public abstract ApplicationAttemptReport getApplicationAttemptReport(
      ApplicationAttemptId applicationAttemptId) throws YarnException, IOException;

  // 获取指定应用的全部ApplicationAttempt的报告信息
  public abstract List<ApplicationAttemptReport> getApplicationAttempts(
      ApplicationId applicationId) throws YarnException, IOException;

  // 获取指定容器ID的Container报告信息
  // $ yarn container -status <Container ID>
  public abstract ContainerReport getContainerReport(ContainerId containerId)
      throws YarnException, IOException;

  // 获取指定应用尝试ID下的Container报告信息
  // $ yarn container -list <Application Attempt ID>
  public abstract List<ContainerReport> getContainers(
      ApplicationAttemptId applicationAttemptId) throws YarnException,
      IOException;

  // 应用还可以在队列之间移动,将指定应用移动到指定的队列中去
  public abstract void moveApplicationAcrossQueues(ApplicationId appId,
      String queue) throws YarnException, IOException;

  // 资源预留机制,注册资源预留;可以更新和删除
  @Public
  @Unstable
  public abstract ReservationSubmissionResponse submitReservation(
      ReservationSubmissionRequest request) throws YarnException, IOException;

  @Public
  @Unstable
  public abstract ReservationUpdateResponse updateReservation(
      ReservationUpdateRequest request) throws YarnException, IOException;

  @Public
  @Unstable
  public abstract ReservationDeleteResponse deleteReservation(
      ReservationDeleteRequest request) throws YarnException, IOException;

  // 获取每个节点对应的节点标签,节点标签用于集群中节点分组
  @Public
  @Unstable
  public abstract Map<NodeId, Set<NodeLabel>> getNodeToLabels()
      throws YarnException, IOException;

  @Public
  @Unstable
  public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes()
      throws YarnException, IOException;

  @Public
  @Unstable
  public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes(
      Set<String> labels) throws YarnException, IOException;

  @Public
  @Unstable
  public abstract List<NodeLabel> getClusterNodeLabels()
      throws YarnException, IOException;                                                                                                                                                    
}



三、 YarnClientImpl部分源码


package org.apache.hadoop.yarn.client.api.impl;

import ...

@Private
@Unstable
public class YarnClientImpl extends YarnClient {

  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);

  // 应用客户端协议 ApplicationClientProtocol
  protected ApplicationClientProtocol rmClient;
  // 此处省略
  ...

  // 服务启动,初始化rmClient
  @Override
  protected void serviceStart() throws Exception {
    try {
      // 底层会调用Proxy.newProxyInstance
      rmClient = ClientRMProxy.createRMProxy(getConfig(),
          ApplicationClientProtocol.class);
      if (historyServiceEnabled) {
        historyClient.start();
      }
      if (timelineServiceEnabled) {
        timelineClient.start();
      }
    } catch (IOException e) {
      throw new YarnRuntimeException(e);
    }
    super.serviceStart();
  }

  // rmClient会去调用ApplicationClientProtocol接口中定义的方法
  // 方法的具体实现见ApplicationClientProtocolPBClientImpl(这里发生RPC调用)
  // 会调用ApplicationClientProtocolPB(RPC代理接口)
  // 方法具体实现ApplicationClientProtocolPBServiceImpl
  // 然后调用RM端的ClientRMService
}



四、 ApplicationClientProtocol部分源码


rmClient会去调用ApplicationClientProtocol接口中定义的方法;

方法的具体实现见

ApplicationClientProtocolPBClientImpl(这里发生RPC调用);

会调用ApplicationClientProtocolPB(RPC代理接口);

方法具体实现ApplicationClientProtocolPBServiceImpl;

然后调用RM端的ClientRMService.

package org.apache.hadoop.yarn.api;

import ...

/**
 * <p>The protocol between clients and the <code>ResourceManager</code>
 * to submit/abort jobs and to get information on applications, cluster metrics,
 * nodes, queues and ACLs.</p> 
 */
@Public
@Stable
public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
  ...

  // 客户端请求获取一个ApplicationId
  // 由RM返回一个单调递增的ApplicationId,由客户端用来注册新应用
  // 除了ApplicationId之外,还有最大可使用资源量maxCapability(Resource)
  // ApplicationId和maxCapability封装在GetNewApplicationResponse里面
  @Public
  @Stable
  @Idempotent
  public GetNewApplicationResponse getNewApplication(
      GetNewApplicationRequest request)
  throws YarnException, IOException;
}



五、 ApplicationClientProtocolPBClientImpl部分源码


package org.apache.hadoop.yarn.api.impl.pb.client;

import ...

@Private
public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol,
    Closeable {

  private ApplicationClientProtocolPB proxy;

  public ApplicationClientProtocolPBClientImpl(long clientVersion,
      InetSocketAddress addr, Configuration conf) throws IOException {
    RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
      ProtobufRpcEngine.class);
    // 此处的proxy会在YarnClient调用start()方法启动时完成初始化
    proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
  }
  ...
}



六、 ApplicationClientProtocolPB部分源码


package org.apache.hadoop.yarn.api;

import ...

@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationClientProtocolPB",
    protocolVersion = 1)
public interface ApplicationClientProtocolPB extends ApplicationClientProtocolService.BlockingInterface {

}



七、 ApplicationClientProtocolPBServiceImpl部分源码



package org.apache.hadoop.yarn.api.impl.pb.service;

import ...

@Private
public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB {

  private ApplicationClientProtocol real;

  public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {
    this.real = impl;
  }
  ...
}  



八、 ApplicationClientProtocolService


ApplicationClientProtocolService是定义在

applicationclient_protocol.proto文件中的,包含所有的RPC接口定义,

请求参数和响应都是message XxxProto。

所有的message定义在

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

文件中。

// 生成java类所在的包;如果不需要产生java代码,则该选项将不起任何作用
option java_package = "org.apache.hadoop.yarn.proto";
// 想要生成Java类的名称;如果不生成java代码,则该选项不起任何作用
option java_outer_classname = "ApplicationClientProtocol";
// 编译器是否应该基于服务定义产生抽象服务代码
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;

import "Security.proto";
import "yarn_service_protos.proto";

service ApplicationClientProtocolService {
  rpc getNewApplication (GetNewApplicationRequestProto) returns (GetNewApplicationResponseProto);
  rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
  rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto);
  rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto);
  rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
  rpc getApplications (GetApplicationsRequestProto) returns (GetApplicationsResponseProto);
  rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);
  rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
  rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
  rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto);
  rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
  rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
  rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto);
  rpc getApplicationAttemptReport (GetApplicationAttemptReportRequestProto) returns (GetApplicationAttemptReportResponseProto);
  rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto);
  rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto);
  rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
  rpc submitReservation (ReservationSubmissionRequestProto) returns (ReservationSubmissionResponseProto);
  rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
  rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);
  rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
  rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
  rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
}



九、 ClientRMService部分源码


resourcemanager“服务端”的

ClientRMService

实现了ApplicationClientProtocol具体逻辑。

package org.apache.hadoop.yarn.server.resourcemanager;

import ...


/**
 * The client interface to the Resource Manager. This module handles all the rpc
 * interfaces to the resource manager from the client.
 */
public class ClientRMService extends AbstractService implements
    ApplicationClientProtocol {
  // 最终会调用到这里的方法并返回结果
  ...
  @Override
  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
      throws YarnException {
      ...
  }    
  ...
}


【END】


更多内容,可微信搜索

知了小巷

公众号关注,更多干货文章第一时间送达。



版权声明:本文为vori2010原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。