0.备注
-
flink 源码版本:1.12
-
概念理解:
- flink 三种基本运行模式(抽象概念):Session、Per-job、Application
-
standalone、Yarn、k8s等——flink基于三种基本运行模式根据不同集群资源管理策略衍生出的不同实现类
图片转载自:https://www.jianshu.com/p/a727499250dc
1.三种集群的对比优缺点总结
总所周知,在flink-1.11引入了
Application
运行模式后,现在flink有三种集群运行模式:
- Session集群
- Per-job集群(简称job集群)
- Application集群
在flink官网中,粗看对于这几个集群的比对主要来自两个方面:
生命周期
和
资源隔离
,实际上还有
main()是在Clinet端执行还是在Cluster端执行
这个方面的对比。(client属于flink架构的一部分,但不属于flink集群。client的任务是连接用户和Cluster)。
1.1 先看每种模式的介绍:
-
Session :多个jobs共享一个JobManager,即所有的任务都运行在这一个集群。例如Standalone静态部署模式。适合单个规模小、执行时间短的大量作业。
- 集群生命周期:不与任务挂钩,作业完成后仍会继续运行知道手动停止session。
-
集群资源隔离:所有任务都运行在一个集群上面,所以隔离性差。Flink的Slot
仅能隔离内存,并不能隔离CPU资源
。如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败。 - main()方法在Client执行。
-
Per-job:每一个job都会动态创建一个专属于自己的集群。
- 集群生命周期:与任务挂钩,随任务运行创建;一旦作业完成,集群将被销毁。
- 集群资源隔离:一个任务独占一个集群,隔离性最好。
- main()方法在Client执行。
-
Application:Application这个词指的是包含一个或多个job的程序。用线程进程概念来类别,一个游戏进程(Application)会专门有渲染画面的线程(job)也会有播放音乐的线程(job)。此模式,一个Application动态创建一个专属自己的集群,Application内所有job共享该集群。
- 集群生命周期:与Flink 应用(Application)挂钩,随Application创建,随Application结束而消亡。
- 集群资源隔离:Application之间资源隔离,Application内所有job共享集群。
- main()方法在Cluster执行
图片转载自:https://www.jianshu.com/p/3898dd13f079
很显然,perjob模式和session模式其实是对颗粒度粗细的两个极端:perjob过细,session又过于粗糙。1.11引入的application模式则就是两者的折中方案。
1.2 一些结论
其他概念比较容易理解,main()方法在哪执行这点会挺让人迷惑的:究竟main()方法里面干了什么?导致main()方法成为指标的原因又是什么?
熟悉flink架构的大家应该记得,main()方法执行后,会获得任务的jar包及相关依赖jar包,同时,会将
StreamGraph
,最终生成
JobGraph
的转换。
main()方法在Client端执行,会给客户端带来额外的压力。且多个作业同时使用同一Client时,会存在单点瓶颈,拖累Cluster。
如果在集群侧执行main(),Client的工作就仅仅将任务的jar包提交即可,main()会在JM的集群入口类(ApplicationClusterEntryPoint)执行main()方法生成JobGraph。由于集群一般资源比较充足,并不会对集群带来太大的压力。
三种基本运行模式下根据不同集群资源管理策略衍生了多种不同的实现类,下一节我们会在三种基本运行模式的衍生实现类中挑选
2. main()方法 源码位置
2.1 StandaloneSessionClusterEntryPoint
Session集群我们以StandaloneSession为例,使用
./bin/start-cluster.sh
启动集群后,最终启动的就是StandaloneSessionClusterEntrypoint。
使用
./bin/flink run XXX.jar
命令提交job后,执行的flink脚本,参数为
run
。
会调用org/apache/flink/client/cli/CliFrontend.java 的main()方法,最终在该类的run方法内执行executeProgram方法,以反射的方式提交执行job的main()方法
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
/*TODO 获取 用户的jar包和其他依赖*/
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
/*TODO 获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数...*/
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
try {
/*TODO 执行程序,通过反射执行提交Job的main()方法,将用户程序转换成StreamGraph,并生成JobGraph提交到集群*/
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
org/apache/flink/client/program/PackagedProgram.java 的callMainMethod(有删减)
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
Method mainMethod;
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
entryClass.getName() + ": " + t.getMessage(), t);
}
try {
/*TODO 调用用户代码的main方法*/
mainMethod.invoke(null, (Object) args);
} catch(……)
……
}
2.2 YarnApplicationClusterEntryPoint
Yarn管理下的Application模式,客户端只是进行jar包提交。对于executeProgram方法在集群侧执行。
提交job到集群的命令,参数是
run-application
./bin/flink run-application -t yarn-application XXX.jar
在org/apache/flink/client/cli/CliFrontend.java 的parseAndRun()方法会根据 run-application 参数调用 该类的runApplication()方法。
protected void runApplication(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
……
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
//调用ApplicationClusterDeployer.run()方法部署程序到集群中
deployer.run(effectiveConfiguration, applicationConfiguration);
}
org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java 的run方法代码如下
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration) throws Exception {
checkNotNull(configuration);
checkNotNull(applicationConfiguration);
LOG.info("Submitting application in 'Application Mode'.");
final ClusterClientFactory<ClusterID> clientFactory = clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clientFactory.createClusterDescriptor(configuration)) {
final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration);
clusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
}
}
至此,客户端并未执行StreamGraph和JobGraph的转换,只是将用户程序的JAR上传。