翻译Flink官网文档,Flink在YARN集群提交job,调试,以及命令行提交格式。最后,是Flink与YARN的交互分析。
文中会夹杂一些实践经验,读者可以参考或者进行实践,完善。
快速开始
在YARN上启动一个长期的Flink集群
启动一个拥有4个Task Manager的yarn会话,每个Task Manager有4gb的堆内存:
# 从flink下载页获取haddoop2包# http://flink.apache.org/downloads.htmlcurl -O tar xvzf flink-{{ site.version }}-bin-hadoop2.tgzcd flink-{{ site.version }}/./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
特别指出,-s参数表示每个Task Manager上可用的处理槽(processing slot)数量。我们建议把槽数量设置成每个机器处理器的个数。
一旦会话被启动,你可以使用./bin/flink工具提交任务到集群上。
在YARN上运行一个Flink的任务
# 从flink下载页获取haddoop2包# http://flink.apache.org/downloads.htmlcurl -O tar xvzf flink-{{ site.version }}-bin-hadoop2.tgzcd flink-{{ site.version }}/./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
Flink YARN 会话
Apache Hadoop YARN是一个资源管理框架,允许一个集群上运行多种分布式应用程序。
Flink 可以和其他应用程序一起在 YARN 上运行。如果已经启动了YARN,用户就不需再启动或安装任何东西。
要求
- Apache Hadoop版本至少2.2
- HDFS(Hadoop分布式文件系统)(或其他由Hadoop支持的分布式文件系统).
如果你在使用Flink YARN客户端有问题时,请看此问题论坛.
启动Flink会话
跟随以下介绍学习怎样在你的yran集群中启动一个Flink会话.
一个会话将启动所有Flink服务(JobManager and TaskManagers),这样你就可以提交程序给集群运行,记住在一个会话中可以运行
多个程序。
下载Flink
下载一个Hadoop版本大于2的Flink包,可从该下载页获得。它包含了所需的文件。
提取下载包的方法:
tar xvzf flink-1.4-SNAPSHOT-bin-hadoop2.tgzcd flink-1.4-SNAPSHOT/
启动一个会话
使用如下命令来启动一个会话,
1./bin/yarn-session.sh
该命令的概览如下:
使用: 要求: -n,--container YARN上容器个数 (=taskmanager的个数) 可选参数 -D 动态属性 -d,--detached 启动分离(提交job的机器与yarn集群分离) -jm,--jobManagerMemory JobManager Container内存大小 [in MB] -nm,--name 自定义提交job的名字 -q,--query 展示yarn的可用资源,内存和核数 (memory, cores) -qu,--queue 指定yarn队列. -s,--slots 每个TaskManager的处理槽数 -tm,--taskManagerMemory 每个TaskManager Container的内存大小 [in MB] -z,--zookeeperNamespace 在高可用模式下,命名空间为zookeeper创建子路径
请注意,客户端需要 YARN_CONF_DIR 或 HADOOP_CONF_DIR 环境变量被设置好,可以通过它读取 YARN 和 HDFS 的配置。
例子: 如下命令分配10个Task Manager,每个拥有8GB内存和32个处理槽:
1./bin/yarn-session.sh -n 10 -tm 8192 -s 32
系统将使用conf/flink-conf.yaml下的配置。如果你想更改一些配置,请参考配置手册。
Flink在YARN上,将会重写如下配置参数的值,jobmanager.rpc.address(因为Job Manager总是分配在不同机器上),
taskmanager.tmp.dirs(我们使用YARN给的tmp目录),parallelism.default(如果槽个数被指定)。
如果你不想改变配置文件来设置配置参数,这里有个方法来获得动态属性,通过-D标示。这样可以通过以下方法来传递参数,
-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624.
例子将请求启动11个容器(尽管仅需10个容器),因为这需要额外的1个容器给ApplicationMaster and Job Manager.
只要Flink部署在YARN集群上,它会让你看到Job Manager间的连接细节。
通过停止unix进程(使用CTRL+C命令)来停止YARN会话,或者在客户端输入stop。
Flink在YARN上仅仅启动所请求的容器,如果YARN集群上有足够的可用资源。大多YARN调度程序为容器,计算请求内存,一些还计算vcores数量。
默认情况,vcores数量等于处理节点数(-s),yarn.containers.vcores允许自定义值重写vcores数量。
隔离YARN会话
如果你不想保持Flink YARN客户端一直运行,可以启动隔离YARN会话来达到目的。这个参数即是-d或–detached。
在此情况下,Flink YARN客户端将仅提交Flink到集群中,然后关闭连接。注意的是在此情况下,将不可能使用Flink来停止YARN会话。
使用YARN命令(yarn application –kill )来停止YARN会话。
关联现有会话
使用如下命令启动一个会话
./bin/yarn-session.sh
这个命令将展示如下概览:
参数必须: -id,--applicationId YARN application Id
如之前所述,YARN_CONF_DIR 或 HADOOP_CONF_DIR环境变量需设置能让YARN 和 HDFS 配置读取到。
例子: 假设以下命令关联一个正运行的Flink YARN会话application_1463870264508_0029
./bin/yarn-session.sh -id application_1463870264508_0029
使用YARN 资源管理器来决定Job Manager的RPC端口从而关联一个运行的会话。
停止YARN会话可通过停止unix进程(CTRL+C)或通过再客户端输入stop。
提交job到Flink
使用如下命令提交一个Flink程序到YARN集群:
./bin/flink
请参考命令行客户端文档。
命令行帮助菜单如下:
run操作编译和运行程序。 语法: run [OPTIONS] "run" 操作参数: -c,--class 程序入口的类 ("main"方法 或 "getPlan()" 方法.jar文件没有在其清单中指定类才需要. -m,--jobmanager 连接Job Manager(master)的地址. 使用此参数连接一个不同的job管理器,而不是在配置中指明. -p,--parallelism 运行程序的并行度. 这个可选参数可覆盖配置中指定的默认值。
用run操作提交一个job到YARN上。客户端可以决定Job Manager的地址。罕见情况下,你可使用-m参数指定Job Manager地址。Job Manager地址可在YARN控制台见到。
例子
wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txthadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ..../bin/flink run ./examples/batch/WordCount.jar hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
如果存在如下错误,请确保所有Task Manager已经启动:
Exception in thread "main" org.apache.flink.compiler.CompilerException: Available instances could not be determined from job manager: Connection timed out.
你可以在Job Manager的web接口中查看Task Manager的数量。接口的地址会在YARN会话的控制台中输出。
如果Task Manager一分钟内没有显示出,那么你应该在日志文件中检查错误在哪。
在YARN上运行一个Flink 任务
上述文档描述了如何启动一个Flink集群在Hadoop YARN环境下。这也可以仅执行一个job而启动Flink在YARN下。
请注意客户端需要-yn值来设置Task Manager的数量。
例子:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
在YARN会话下命令行 ./bin/flink tool是可选的,以y或yarn前缀。
注意:你可以通过设置FLINK_CONF_DIR环境变量来为每个job使用不同的配置目录。
使用这个将拷贝来自Flink分布下conf目录,并更新每个job的日志。
注意:组合-m yarn-cluster和隔离YARN会话(-yd)命令可”焚毁和忘掉”提交Flink job在YARN集群中。
在此情况下,你的应用程序将得不到任何确认结果或 排除ExecutionEnvironment.execute()的请求消息。
使用jars&Classpath
默认下,Flink会把用到的jars带进系统路径,当运行一个job时。这个行为可以用yarn.per-job-cluster.include-user-jar
参数来控制。
当设置这个参数为DISABLED时,Flink将把用户路径的jars带进。
user-jars在系统路径位置可以通过设置参数来控制:
- ORDER:默认,按照字典路径顺序添加jar进系统。
- FIRST:系统路径最前的添加。
- LAST:系统路径最后的添加。
Flink在YARN上的恢复行为
Flink的YARN客户端有如下配置参数来控制行为当容器失败后,这些参数可通过conf/flink-conf.yaml设置,也可以通过
在启动YARN会话时用-D参数设置。
- yarn.reallocate-failed: 控制Flink是否重新分配失败的Task Manager。默认true。
- yarn.maximum-failed-containers: ApplicationMaster接受的最大容器失败个数,直到YARN会话失败。默认是-n设置的Task Manager个数。
- yarn.application-attempts: ApplicationMaster(+其拥有的Task Manager个数)的尝试次数,默认1,ApplicationMaster失败则YARN会话整个失败。在YARN中指定更大值以便重启ApplicationMaster。
调试一个失败的YARN会话
有很多原因使得一个Flink的YARN会话失败。一个错误的Hadoop安装(HDFS权限,YARN配置),版本兼容(运行Flink在vanilla的Hadoop上,却依赖Cloudera Hadoop)或其他原因。
日志文件
部署时Flink YARN会话失败,用户必须依靠Hadoop YARN的日志。
最有用的是YARN日志集合。用户必须在yarn-site.xml文件中把yarn.log-aggregation-enable参数值设置为true,
使其生效。只要它一经生效,用户可以使用如下命令来检索一个(失败)yarn会话的所有日志文件。
yarn logs -applicationId
在会话结束时请等待几秒钟直到日志展示出来。
YARN客户端控制台&web接口
Flink YARN客户端也可以在终端输出错误信息,如果在运行时出错(如某时间Task Manager停止工作).此外,有YARN资源管理器的web接口(默认是8088端口),这个资源管理器web接口的端口由
yarn.resourcemanager.webapp.address参数值决定。
在web页面可访问运行YARN应用程序的日志文件并可显示失败应用程序的诊断信息。
为指定Hadoop版本构建YARN客户端
用户使用像Hortonworks, Cloudera or MapR等公司发布的Hadoop,它们的Hadoop(HDFS)版本和YARN版本可能与构建Flink冲突,
请参考构建介绍获得更细介绍。
防火墙后在YARN运行Flink
一些YARN集群使用防火墙来控制集群和余下网络之间的网络传输,在这种配置下,Flink的job提交到YARN会话中只能通过集群网络(在防火墙背后),
如果在生产环境下不可行,Flink允许配置一定范围的端口给相关服务,
在这些范围配置下,用户可以跨越防火墙提交job到Flink。
当前,有两个服务需要提交job:
- Job Manager(YARN上的ApplicationMaster)
- 运行Job Manager的BlobServer
当提交一个job到Flink,BlobServer将会分发用户代码中的jars给所有工作节点(Task Manager),
Job Manager接收job本身并触发执行。
以下两个配置参数可指定端口:
- yarn.application-master.port
- blob.server.port
这两个配置可接收单个端口值(如50010),也可以接收范围(50000-50025),或者
组合(50010,50011,50020-50025,50050-50075)
(Hadoop使用同样的机制,配置参数是yarn.app.mapreduce.am.job.client.port-range)
背后/内部
本小节简要描述Flink和YARN如何交互.
YARN客户端需要访问Hadoop的配置以连接YARN资源管理器和HDFS,这决定了Hadoop配置采取如下策略,
- 测试YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH (按此顺序)是否已配置,其中一个配置了,它们就可以读取到配置。
- 如若上述策略失败(正确的YARN安装不会出现此情况),客户端使用HADOOP_HOME环境变量。如环境变量设置了,客户端会尝试访问$HADOOP_HOME/etc/hadoop(hadoop2.)或 $HADOOP_HOME/conf(hadoop1.)
当启动一个新的Flink YARN会话,客户端会先确认请求的资源(容器和内存)是否能获得到。
之后,客户端上传包含Flink和HDFS配置的jars(步骤1)。
下一步客户端请求一个YARN容器(步骤2)来启动ApplicationMaster(步骤3),
客户端注册了配置和容器资源的jar文件,指定机器运行的YARN节点管理器会准备好容器(下载文件),
这些结束了,ApplicationMaster (AM)就启动了。
Job Manager和AM运行在同一个容器里,它们成功启动后,AM知道job管理器(它拥有的主机)的地址。
Job Manager为Task Manager生成一个新的Flink配置(这样task可连接Job Manager)。
文件也上传到HDFS上。另外AM容器也为Flink的web接口服务。YARN代码的所有端口是分配的临时端口。
这可让用户并行执行多个yarn会话。
然后,AM启动分配到的容器,这些容器给Flink的Task Manager,将会下载jar和更新来自HDFS配置
,这些步骤完成后,Flink就安装起来了,可以接收job了。
实践经验
1,Flink的流job运行,会提交的终端一直保持执行,如果想在后台运行,可使用supervior监控执行的进程,这样不会在终端显示运行,另外,job报错也会自动再次提交运行;
2,Flink的job提交YARN集群执行时,建议把参数-yd带上,即断开提交客户端与YARN集群的通信。如果不加-yd参数,会在提交job的机器上占用很大内存保持job在YARN上的通信,造成机器资源占用。断开连接后怎么保持job的可靠性,即失败时可自动提交,这功能可在Flink的job代码中配置重启策略:
12345678910111,失败后尝试重启动策略在失败后,尝试重启动3次,每次间隔10秒;如果尝试3次后,job还是失败,则不再重启,宣告job失败;不是立刻执行重启,而是延迟固定时间,这是为了让底层有时间准备好环境,比如外部系统连接.import org.apache.flink.api.common.time.Timeimport java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesval env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // number of restart attempts Time.of(10, TimeUnit.SECONDS) // delay))2,失败率重启策略在一定时间段内重启失败率太高,则宣告job失败;在两个尝试策略下,会等待一定时间.时间间隔内尝试重启job的次数;时间间隔多长;两次重启间的延迟时长;val env = ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay))
3,Flink的新版本,最好在assembly项目时把包到带上,虽然包很大,但是为了YARN的其他开源版本兼容,建议使用程序的包。这样提交运行时不用–classpath带上包。