flink的Standalone-HA模式安装

  • Post author:
  • Post category:其他


前言

本文使用flink1.14.5版本,介绍standalone-HA模式的安装。

此模式时高可用架构,采用zookeeper协调多个JobManager,保持每时每刻有一个运行中的JobManager,其余JobManager处理stand by状态。

因为涉及到运行过程中的状态数据的存储,如savepoint,checkoutpoint等。采用minio替换掉hdfs来存储状态。

一、资源说明

1、flink集群机器

机器ip hostname JobManager运行节点 TaskManager运行节点 备注
10.113.1.121 flinknode1 JobManager
10.113.1.122 flinknode2 JobManager
10.113.1.123 flinknode3 TaskManager
10.113.1.124 flinknode4 TaskManager

如果是standalone-ha模式最少使用两台机器,保证有两个JobManager。TaskManager可以与JobManager部署在一起,也可以单独部署

2、zookeeper集群(安装省略)

hostname 运行进程
zknode1 zookeeper
zknode2 zookeeper
zknode3 zookeeper

3、minio(安装省略)

miniodocker单机版安装参看,

minio之docker的单机版安装_神云瑟瑟的博客-CSDN博客_docker minio

hostname 运行进程
minionode minio

需要再minio中创建桶,本文用的桶名为“flink”

配置对该桶有读写权限的账户

4、各个组件架构图

二、机器的基础配置

1、对flinknode机器安装java环境

flink1.14版本可以使用jdk8,1.15后续的版本不能再使用jdk8了。

2、对flinknode1-4,4台机器配置hostname,并且在/etc/hosts文件中配置映射,(如果不配置hosts就使用ip访问)

实践生产中建议设置hosts

3、设置flinknode机器相互免密登录

本文配置的是相互都免密登录,

实际好像是保证JobManager可以免密登录TaskManager就可以了

4、创建存放软件的目录

/opt/soft

这个目录因人而异,本文是存放在以上目录的

三、下载安装

1、下载并且解压

下载预览界面:

https://flink.apache.org/zh/downloads.html

登录flinknode1

cd /opt/soft
wget https://www.apache.org/dyn/closer.lua/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
tar zxvf flink-1.14.5-bin-scala_2.12.tgz

2、修改配置

a、对${FLINK_HOME}/conf/flink-conf.yaml修改,在文件最后添加配置

fs.allowed-fallback-filesystems: s3
state.backend: filesystem
#checkout存储地址
state.checkpoints.dir: s3://flink/checkpoint
#savepoint存储地址
state.savepoints.dir: s3://flink/savepoint
#minio的地址
s3.endpoint: http://minionode:9000
s3.path.style.access: true
#minio的访问key
s3.access-key: xxxx
s3.secret-key: xxxx

high-availability: zookeeper
#zookeeper地址
high-availability.zookeeper.quorum: zknode1:2181,zknode2:2181,zknode3:2181
#recovery存储地址
high-availability.storageDir: s3://flink/recovery
#zookeeper没有设置密码,就使用open就ok
high-availability.zookeeper.client.acl: open

b、对${FLINK_HOME}/conf/masters修改,修改后的配置如

该配置中说明了运行JobManager的节点和IP

flinknode1:8081
flinknode2:8081

c、对${FLINK_HOME}/conf/works修改,修改后的配置如

该配置中说明了运行TaskManager的节点和IP

flinknode3
flinknode4

d、对${FLINK_HOME}/bin/config.sh修改

大概在107行的位置对变量`DEFAULT_ENV_PID_DIR`的值进行修改,修改后的内容如下

DEFAULT_ENV_PID_DIR="./.flinkpid"                          # Directory to store *.pid files to

主要对master和work启动后的  StandaloneSessionClusterEntrypoint进程和TaskManagerRunner进程的pid存储位置修改,默认存储在“/tmp”目录,操作系统可能会清理该目录的资源,造成启动后的flink集群无法使用“${FLINK_HOME}/bin/stop-cluster.sh”命令停止集群。

本文使用相对目录,可以存储在用户目录下,如“/root/.flinkpid/”目录

保存进程的文件为:

/root/.flinkpid/flink-root-standalonesession.pid

/root/.flinkpid/flink-root-taskexecutor.pid

3、拷贝s3支持的包,flink-s3-fs-hadoop-1.14.5.jar

cd /opt/soft/flink-1.14.5
mkdir plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.14.5.jar ./plugins/s3-fs-hadoop/

如果未执行上面的s3支持的包的拷贝,会报错

Could not find a file system implementation for scheme ‘s3’

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

4、分发修改后的flink文件夹

cd /opt/soft
scp -r flink-1.14.5 root@flinknode2:/opt/soft
scp -r flink-1.14.5 root@flinknode3:/opt/soft
scp -r flink-1.14.5 root@flinknode4:/opt/soft

5、在主节点上启动

cd /opt/soft/flink-1.14.5
./bin/start-cluster.sh

在主节点上启动后,会自动将其他节点的任务也启动起来。

6、配置historyServer(非必须)

对${FLINK_HOME}/conf/flink-conf.yaml修改,在文件最后添加配置

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
# jobmanager上传记录地址
jobmanager.archive.fs.dir: s3://flink/completed-jobs/

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0

# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
## historyserver查看记录地址
historyserver.archive.fs.dir: s3://flink/completed-jobs/

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000

找一个节点,通过以下命令来启动停止history服务

./bin/historyserver.sh start
./bin/historyserver.sh stop

四、相关命令说明

命令 说明
./bin/start-cluster.sh 整体启动集群
./bin/stop-cluster.sh 整体停止机器
./bin/jobmanager.sh 对本节点jobManager的操作,start,stop
./bin/taskmanager.sh 对本节点taskManager的操作,start,stop

使用后面两条命令,可以定点维护某些需要维护的节点

五、相关进程说明

使用jps命令查看进程

进程名字
StandaloneSessionClusterEntrypoint jobManager
TaskManagerRunner taskmanager



版权声明:本文为shenyunsese原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。