本文主要解析Client<–>ResourceManager之间接口(方法)调用过程(不涉及RPC细节)。
“通用”YARN应用涉及的角色及交互:
RM:ResourceManager
AM:ApplicationMaster
NM:NodeManager
交互中用到的主要通信协议:
ApplicationClientProtocol
ApplicationMasterProtocol
ContainerManagementProtocol
客户端程序与RM进行交互,
Client<–>ResourceManager,通过YarnClient对象来实现。
AM与RM进行交互,
ApplicationMaster<–>ResourceManager,
通过AMRMClientAsync对象来实现,
通过AMRMClientAsync.CallbackHandler异步处理事件信息。
AM与NM进行交互,
ApplicationMaster<–>NodeManager,
通过NMClientAsync对象来实现,主要是启动Container,
通过NMClientAsync.CallbackHandler异步处理Container事件。
接口请求和响应的proto message定义:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。
Hadoop版本3.2.1
Flink版本1.10
0、 关键接口和实现类调用关系图
一、 比如Flink集群发布到YARN上的YARN集群描述信息如下
// 起始点是FlinkYarnSessionCli#main
public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
private final YarnConfiguration yarnConfiguration;
// YARN客户端对象YarnClient,此对象的创建位置见后续Flink源码分析
private final YarnClient yarnClient;
...
}
二、 YarnClient部分源码
package org.apache.hadoop.yarn.client.api;
import ...
// 抽象类YarnClient,实现类YarnClientImpl
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {
// 创建一个新的YarnClient实例
@Public
public static YarnClient createYarnClient() {
// YarnClient的具体实现YarnClientImpl
YarnClient client = new YarnClientImpl();
return client;
}
@Private
protected YarnClient(String name) {
super(name);
}
// 创建应用,返回YarnClientApplication对象,主要是里面的ApplicationSubmissionContext和GetNewApplicationResponse两个对象
public abstract YarnClientApplication createApplication()
throws YarnException, IOException;
// 向YARN提交应用,阻塞调用,直到提交成功、被RM接受之后才会返回ApplicationId
// 这里直接提交的是ApplicationSubmissionContext对象
public abstract ApplicationId submitApplication(
ApplicationSubmissionContext appContext) throws YarnException,
IOException;
// kill掉指定applicationId的应用
// 比如 yarn application -kill xxx-xxxx-1
public abstract void killApplication(ApplicationId applicationId) throws YarnException,
IOException;
// 获取指定applicationId的应用报告信息(ApplicationReport有约19个成员)
// WEB-UI页面能够看到每个应用的相关信息;proto message: ApplicationReportProto
public abstract ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException;
// 获取应用的AMRMToken,此Token用于保证AM约RM之间的通信安全
public abstract org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
getAMRMToken(ApplicationId appId) throws YarnException, IOException;
// 获取YARN集群中所有应用的应用报告信息
public abstract List<ApplicationReport> getApplications()
throws YarnException, IOException;
// 根据应用类型获取应用报告信息,比如类型:Apache Flink 或者 MAPREDUCE
// 具体实现(RM的“服务端”)见ClientRMService里的getApplications方法
public abstract List<ApplicationReport> getApplications(
Set<String> applicationTypes) throws YarnException, IOException;
// 根据一个或多个应用状态获取集群中应用的应用报告信息
// NEW NEW_SAVING SUBMITTED ACCEPTED RUNNING FINISHED FAILED KILLED
public abstract List<ApplicationReport>
getApplications(EnumSet<YarnApplicationState> applicationStates)
throws YarnException, IOException;
// 根据应用类型和应用状态获取应用的报告信息
public abstract List<ApplicationReport> getApplications(
Set<String> applicationTypes,
EnumSet<YarnApplicationState> applicationStates) throws YarnException,
IOException;
// 条件:调度队列、用户、应用类型、应用状态
public abstract List<ApplicationReport> getApplications(Set<String> queues,
Set<String> users, Set<String> applicationTypes,
EnumSet<YarnApplicationState> applicationStates) throws YarnException,
IOException;
// 获取YARN集群的(健康等)指标数据
public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
IOException;
// 获取集群节点的报告信息
// 具体实现(RM的“服务端”)见ClientRMService里的getClusterNodes方法
public abstract List<NodeReport> getNodeReports(NodeState... states)
throws YarnException, IOException;
// 获取代理Token用于与YARN进行安全通信
// 具体实现(RM的“服务端”)见ClientRMService里的getDelegationToken方法
public abstract Token getRMDelegationToken(Text renewer)
throws YarnException, IOException;
// 获取指定队列名称的队列信息(信息封装在QueueInfo里面,scheduler里面获取)
// QueueInfoProto
public abstract QueueInfo getQueueInfo(String queueName) throws YarnException,
IOException;
// 获取全部队列
public abstract List<QueueInfo> getAllQueues() throws YarnException, IOException;
// 获取root(根)队列的信息
public abstract List<QueueInfo> getRootQueueInfos() throws YarnException, IOException;
// 获取指定队列名称下的子队列信息
public abstract List<QueueInfo> getChildQueueInfos(String parent) throws YarnException,
IOException;
// 获取队列的ACL权限信息
public abstract List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
IOException;
// 获取指定应用尝试ID对应ApplicationAttempt的报告信息【翻译真别扭】
public abstract ApplicationAttemptReport getApplicationAttemptReport(
ApplicationAttemptId applicationAttemptId) throws YarnException, IOException;
// 获取指定应用的全部ApplicationAttempt的报告信息
public abstract List<ApplicationAttemptReport> getApplicationAttempts(
ApplicationId applicationId) throws YarnException, IOException;
// 获取指定容器ID的Container报告信息
// $ yarn container -status <Container ID>
public abstract ContainerReport getContainerReport(ContainerId containerId)
throws YarnException, IOException;
// 获取指定应用尝试ID下的Container报告信息
// $ yarn container -list <Application Attempt ID>
public abstract List<ContainerReport> getContainers(
ApplicationAttemptId applicationAttemptId) throws YarnException,
IOException;
// 应用还可以在队列之间移动,将指定应用移动到指定的队列中去
public abstract void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException;
// 资源预留机制,注册资源预留;可以更新和删除
@Public
@Unstable
public abstract ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException;
@Public
@Unstable
public abstract ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException;
@Public
@Unstable
public abstract ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException;
// 获取每个节点对应的节点标签,节点标签用于集群中节点分组
@Public
@Unstable
public abstract Map<NodeId, Set<NodeLabel>> getNodeToLabels()
throws YarnException, IOException;
@Public
@Unstable
public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes()
throws YarnException, IOException;
@Public
@Unstable
public abstract Map<NodeLabel, Set<NodeId>> getLabelsToNodes(
Set<String> labels) throws YarnException, IOException;
@Public
@Unstable
public abstract List<NodeLabel> getClusterNodeLabels()
throws YarnException, IOException;
}
三、 YarnClientImpl部分源码
package org.apache.hadoop.yarn.client.api.impl;
import ...
@Private
@Unstable
public class YarnClientImpl extends YarnClient {
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
// 应用客户端协议 ApplicationClientProtocol
protected ApplicationClientProtocol rmClient;
// 此处省略
...
// 服务启动,初始化rmClient
@Override
protected void serviceStart() throws Exception {
try {
// 底层会调用Proxy.newProxyInstance
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
if (historyServiceEnabled) {
historyClient.start();
}
if (timelineServiceEnabled) {
timelineClient.start();
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
// rmClient会去调用ApplicationClientProtocol接口中定义的方法
// 方法的具体实现见ApplicationClientProtocolPBClientImpl(这里发生RPC调用)
// 会调用ApplicationClientProtocolPB(RPC代理接口)
// 方法具体实现ApplicationClientProtocolPBServiceImpl
// 然后调用RM端的ClientRMService
}
四、 ApplicationClientProtocol部分源码
rmClient会去调用ApplicationClientProtocol接口中定义的方法;
方法的具体实现见
ApplicationClientProtocolPBClientImpl(这里发生RPC调用);
会调用ApplicationClientProtocolPB(RPC代理接口);
方法具体实现ApplicationClientProtocolPBServiceImpl;
然后调用RM端的ClientRMService.
package org.apache.hadoop.yarn.api;
import ...
/**
* <p>The protocol between clients and the <code>ResourceManager</code>
* to submit/abort jobs and to get information on applications, cluster metrics,
* nodes, queues and ACLs.</p>
*/
@Public
@Stable
public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
...
// 客户端请求获取一个ApplicationId
// 由RM返回一个单调递增的ApplicationId,由客户端用来注册新应用
// 除了ApplicationId之外,还有最大可使用资源量maxCapability(Resource)
// ApplicationId和maxCapability封装在GetNewApplicationResponse里面
@Public
@Stable
@Idempotent
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request)
throws YarnException, IOException;
}
五、 ApplicationClientProtocolPBClientImpl部分源码
package org.apache.hadoop.yarn.api.impl.pb.client;
import ...
@Private
public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol,
Closeable {
private ApplicationClientProtocolPB proxy;
public ApplicationClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
ProtobufRpcEngine.class);
// 此处的proxy会在YarnClient调用start()方法启动时完成初始化
proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
}
...
}
六、 ApplicationClientProtocolPB部分源码
package org.apache.hadoop.yarn.api;
import ...
@Private
@Unstable
@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationClientProtocolPB",
protocolVersion = 1)
public interface ApplicationClientProtocolPB extends ApplicationClientProtocolService.BlockingInterface {
}
七、 ApplicationClientProtocolPBServiceImpl部分源码
package org.apache.hadoop.yarn.api.impl.pb.service;
import ...
@Private
public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB {
private ApplicationClientProtocol real;
public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {
this.real = impl;
}
...
}
八、 ApplicationClientProtocolService
ApplicationClientProtocolService是定义在
applicationclient_protocol.proto文件中的,包含所有的RPC接口定义,
请求参数和响应都是message XxxProto。
所有的message定义在
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
文件中。
// 生成java类所在的包;如果不需要产生java代码,则该选项将不起任何作用
option java_package = "org.apache.hadoop.yarn.proto";
// 想要生成Java类的名称;如果不生成java代码,则该选项不起任何作用
option java_outer_classname = "ApplicationClientProtocol";
// 编译器是否应该基于服务定义产生抽象服务代码
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "Security.proto";
import "yarn_service_protos.proto";
service ApplicationClientProtocolService {
rpc getNewApplication (GetNewApplicationRequestProto) returns (GetNewApplicationResponseProto);
rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto);
rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto);
rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
rpc getApplications (GetApplicationsRequestProto) returns (GetApplicationsResponseProto);
rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);
rpc getQueueInfo (GetQueueInfoRequestProto) returns (GetQueueInfoResponseProto);
rpc getQueueUserAcls (GetQueueUserAclsInfoRequestProto) returns (GetQueueUserAclsInfoResponseProto);
rpc getDelegationToken(hadoop.common.GetDelegationTokenRequestProto) returns (hadoop.common.GetDelegationTokenResponseProto);
rpc renewDelegationToken(hadoop.common.RenewDelegationTokenRequestProto) returns (hadoop.common.RenewDelegationTokenResponseProto);
rpc cancelDelegationToken(hadoop.common.CancelDelegationTokenRequestProto) returns (hadoop.common.CancelDelegationTokenResponseProto);
rpc moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequestProto) returns (MoveApplicationAcrossQueuesResponseProto);
rpc getApplicationAttemptReport (GetApplicationAttemptReportRequestProto) returns (GetApplicationAttemptReportResponseProto);
rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto);
rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto);
rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
rpc submitReservation (ReservationSubmissionRequestProto) returns (ReservationSubmissionResponseProto);
rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
}
九、 ClientRMService部分源码
resourcemanager“服务端”的
ClientRMService
实现了ApplicationClientProtocol具体逻辑。
package org.apache.hadoop.yarn.server.resourcemanager;
import ...
/**
* The client interface to the Resource Manager. This module handles all the rpc
* interfaces to the resource manager from the client.
*/
public class ClientRMService extends AbstractService implements
ApplicationClientProtocol {
// 最终会调用到这里的方法并返回结果
...
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException {
...
}
...
}
【END】
更多内容,可微信搜索
知了小巷
公众号关注,更多干货文章第一时间送达。