使用PackagedProgram封装Flink程序,构建JobGraph,提交Flink集群
1. PackagedProgram简介
-
官方介绍
This class encapsulates represents a program, packaged in a jar file. It supplies functionality to extract nested libraries, search for the program entry point, and extract a program plan. -
个人理解
将写好的Flink应用程序封装起来,包括jar文件,mainClass(主函数入口),args(参数),savepoint这些
2. 使用
最近在开发一个flink相关的组件,发现除了通过restful api的形式远程提交flink任务外,还可以通过构建PackagedProgram,然后创建JobGraph,可以将编写的Flink程序提交给local或者remote模式的flink集群运行.
需要使用flink-clients依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
3. PackagedProgram类创建
3.1 PackaghedProgram.newBuilder
可以通过它的Builder方法实现,需要设置一些必备的信息,比如mainClass,jarPath这些
val packagedProgram: PackagedProgram = PackagedProgram
.newBuilder
.setEntryPointClassName("你的flink程序文件主函数入口")
//"你的flink程序文件"
.setJarFile(jarFile)
//"savepoint的信息"
.setSavepointRestoreSettings(SavepointRestoreSettings.none)
.setArguments("你的flink程序可能需要的参数","1","2","3")
.build()
3.2 SavepointRestoreSettings
主要就是保存点的设置,可以直接调用类中的static方法创建
//不设置的情况
SavepointRestoreSettings.none
//通过path创建
SavepointRestoreSettings.forPath(path, allowNonRestoredState)
4. 通过PackagedProgramUtils创建JobGraph
4.1 createJobGraph源码
这个类下有两个static方法方法可以直接创建JobGraph,区别是一个指定生成JobId,一个随机生成JobId
/**
* Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
* PackagedProgram}.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param jobID the pre-generated job id
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput)
throws ProgramInvocationException {
final Pipeline pipeline =
getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, suppressOutput);
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
packagedProgram.getUserCodeClassLoader(),
pipeline,
configuration,
defaultParallelism);
if (jobID != null) {
jobGraph.setJobID(jobID);
}
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
return jobGraph;
}
4.2 创建JobGraph
- PakcageProgram
- configuration flink的conf
- parallelism 并行度,设置默认的也可以
- jobId
- suppressoutOutPut boolean类型,是否打印stdout/stderr在jobGraph创建阶段
val jobGraph: JobGraph = PackagedProgramUtils.createJobGraph(
packagedProgram,
flinkConfig,
parallelism,
null,
false
)
- configuration也可以通过加载本地flink-conf.yaml获得,传入flink的安装路径即可.
private def getFlinkDefaultConfiguration(flinkHome: String): Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
}
5. 将JobGraph提交local模式的Flink
5.1 构建MiniCluster
MiniCluster的构建还是很简单,只要设置些必要参数就可以了,包括taskManager数量,slot数量之类的
//设置必要的属性,包括taskManager数量,slot之类
val miniClusterConfig: MiniClusterConfiguration =
new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumTaskManagers(numTaskManagers)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build()
val cluster = new MiniCluster(miniClusterConfig)
cluster.start()
5.2 构建MiniClusterClient
有了cluster之后,构建client,配置ip,端口就可以交互了
val host: String = "localhost"
val port: Int = miniCluster.getRestAddress.get.getPort
flinkConfig
.set(JobManagerOptions.ADDRESS,host)
.set[JavaInt](JobManagerOptions.PORT,port)
.set(RestOptions.ADDRESS, host)
.set[JavaInt](RestOptions.PORT, port)
.set(DeploymentOptions.TARGET, RemoteExecutor.NAME)
var client: ClusterClient[MiniClusterId] = new MiniClusterClient(flinkConfig,miniCluster)
5.3 提交任务
成功的话就会返回JobId
val jobId: String = client.submitJob(jobGraph).get().toString
6. 源码
package com.chenzhiling.flink.submitter
import org.apache.flink.client.deployment.executors.RemoteExecutor
import org.apache.flink.client.program.MiniClusterClient.MiniClusterId
import org.apache.flink.client.program.{ClusterClient, MiniClusterClient, PackagedProgram, PackagedProgramUtils}
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointRestoreSettings}
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import java.io.File
import java.lang.{Integer => JavaInt}
import java.util.{List => JavaList}
import scala.collection.JavaConversions._
import scala.util.Try
/**
* Author: CHEN ZHI LING
* Date: 2022/6/9
* Description:
*/
object FlinkLocalSubmitter {
def submit(flinkHome: String, mainClass: String, jar:File,args:JavaList[String]): String = {
var packageProgram: PackagedProgram = null
var client: ClusterClient[MiniClusterId] = null
val flinkConf: Configuration = getFlinkDefaultConfiguration(flinkHome)
try {
//构建jobGraph
val packageProgramJobGraph: (PackagedProgram, JobGraph) =
getJobGraph(flinkConf, mainClass,args,jar)
packageProgram = packageProgramJobGraph._1
val jobGraph: JobGraph = packageProgramJobGraph._2
//创建local集群
client = createLocalCluster(flinkConf)
//提交任务
val jobId: String = client.submitJob(jobGraph).get().toString
jobId
} catch {
case exception: Exception =>
throw exception
} finally {
if(null != packageProgram) packageProgram.close()
if(null != client) client.close()
}
}
private[submitter] def getJobGraph(flinkConfig: Configuration,
mainClass: String,
args: JavaList[String],
jarFile: File): (PackagedProgram, JobGraph) = {
val packagedProgram: PackagedProgram = PackagedProgram
.newBuilder
//mainClass入口
.setEntryPointClassName(mainClass)
.setJarFile(jarFile)
.setSavepointRestoreSettings(SavepointRestoreSettings.none())
//:_*这个符号很神奇
.setArguments(args:_*)
.build()
val jobGraph: JobGraph = PackagedProgramUtils.createJobGraph(
packagedProgram,
flinkConfig,
1,
null,
false
)
packagedProgram -> jobGraph
}
private[this] def createLocalCluster(flinkConfig: Configuration): MiniClusterClient = {
val miniCluster: MiniCluster = createMiniCluster(flinkConfig)
val host: String = "localhost"
val port: Int = miniCluster.getRestAddress.get.getPort
flinkConfig
.set(JobManagerOptions.ADDRESS,host)
.set[JavaInt](JobManagerOptions.PORT,port)
.set(RestOptions.ADDRESS, host)
.set[JavaInt](RestOptions.PORT, port)
.set(DeploymentOptions.TARGET, RemoteExecutor.NAME)
new MiniClusterClient(flinkConfig,miniCluster)
}
/**
* 构建miniCluster
* @param flinkConfig flink配置
* @return
*/
private[this] def createMiniCluster(flinkConfig: Configuration): MiniCluster = {
val numTaskManagers: Int = ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER
val numSlotsPerTaskManager: Int = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS)
val miniClusterConfig: MiniClusterConfiguration =
new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumTaskManagers(numTaskManagers)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build()
val cluster = new MiniCluster(miniClusterConfig)
cluster.start()
cluster
}
/**
* 根据flinkHome的地址去获取flink-conf.yaml文件
* 包含所有设置的flink的配置,相当于map
*/
private[submitter] def getFlinkDefaultConfiguration(flinkHome: String): Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
}
}
7. 结束
以上就完成了,后续尝试下提交到remote集群