一、安装
1、环境准备:
- 环境变量配置:
export JAVA_HOME=/usr/share/java/jdk1.8.0_131
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export HADOOP_HOME=/usr/hdp/3.1.0.0-78/hadoop/
export HADOOP_CONF=/usr/hdp/3.1.0.0-78/hadoop/conf
export PATH=$HADOOP_HOME/bin:$PATH
export HADOOP_CLASSPATH=`${
HADOOP_HOME}/bin/hadoop classpath`
- ssh配置
- flink的安装包:flink-1.10.0-bin-scala_2.12.tgz
2、安装
1、standalone模式
- 集群规划:daas4(master),daas5(slave),daas6(slave)
- 解压:`tar -zxvf flink-1.10.0-bin-scala_2.12.tgz
- 修改配置文件:
vi $FLINK_HOME/conf/flink-conf.yaml
#设置jm的地址
jobmanager.rpc.address: daas4
#修改目录的输出位置
env.log.dir: /var/log/flink
vi $FLINK_HOME/conf/master
daas4
vi $FLINK_HOME/conf/slaves
daas5
daas6
- 启动及关闭
启动:./bin/start-cluster.sh
停止:./bin/stop-cluster.sh
-
测试
(1)离线测试
准备测试数据:
因为是集群模式,所以任务可能在daas5或者daas6上执行,两台机器都要有/data/word.txt.或者上传到hdfs上。
运行命令:
$FLINK_HOME/bin/flink run $FLINK_HOME/examples/batch/WordCount.jar –input /data/word.txt –output /data/out.txt
(2)实时测试:
准备测试数据:daas6上 nc -l 9999产生数据
运行命令:
$FLINK_HOME/bin/flink run $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar –host daas6 –port 9999
结果查看:webUI访问:daas4:8081
-
内部提交流程:
- client客户端提交任务给JobManager
- JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行
- TaskManager定期向JobManager汇报状态
-
如果需要配置HA
修改配置文件:
vi $FLINK_HOME/conf/flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: daas3:2181,daas4:2181,daas5:2181
high-availability.storageDir: hdfs://daas/flink/recovery
执行流程:
-
容错配置 checkpoint & savepoint
修改配置文件:
vi $FLINK_HOME/conf/flink-conf.yaml
state.backend: rocksdb
#存储检查点的数据文件和元数据的默认目录
state.backend.fs.checkpointdir: hdfs://daas/flink/pointsdata/
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# relative path is better for change your cluster
state.checkpoints.dir: hdfs://daas/flink/checkpoints/
# Default target directory for savepoints, optional.
#
#state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://daas/flink/savepoints/
#
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#开启增量checkpoint 全局有效
state.backend.incremental: true
#保存最近检查点的数量
state.checkpoints.num-retained: 3
2、yarn模式
准备工作:
需要将依赖hadoop的jar包及其他jar包放到flink/lib目录下
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
以下三个jar包可以重新编译上面的hadoop包加入依赖,也可以直接加到flink的lib下
jersey-client-1.9.jar
jersey-common-2.9.jar
jersey-core-1.9.jar
运行在yarn上有两种方式:session与per-job方式
2.1 session 方式
共享dispatcher和resourceManager,共享资源即taskManager,适合规模小,执行时间较短的job.
使用步骤:
1、开启yarn-session ./bin/yarn-session.sh -n 2 -tm 800 -s 1
参数说明:
yarn-session.sh脚本可以携带的参数:
Required
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
Optional
-D <arg> 动态属性
-d,--detached 会自己关闭cient客户端
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-at,--applicationType Set a custom application type on YARN
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
开启后下面会显示jobmanager启动的位置,用于下面的任务提交:
2、准备数据:daas6: nc -l 9999
3、提交任务:
$FLINK_HOME/bin/flink run -m daas6:38037 $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar –host daas6 –port 9999
4、查看结果:
2.2 Per-job方式:
独享dispatcher和resourceManager,独享资源即taskManager,适合执行时间长的大job.
使用步骤:
1、 测试数据:socket发送消息:nc -l 9999
2、提交任务:/var/opt/flink/bin/flink run -m yarn-cluster -d /var/opt/flink/examples/streaming/SocketWindowWordCount.jar –host daas6 –port 9999
4、查看结果:
5、关闭任务:
echo “stop” | ./bin/yarn-session.sh -id
2.3 内部运行流程:
- 检查资源是否存在,上传jar包和配置文件到HDFS集群上
- 申请资源和请求AppMaster容器
-
Yarn分配资源AppMaster容器,并启动JobManager
• JobManager和ApplicationMaster运行在同一个container上。
• 一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。
• 它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。
• 这个配置文件也被上传到HDFS上。
• 此外,AppMaster容器也提供了Flink的web服务接口。YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink - 申请worker资源,启动TaskManager
-
拓展:flink内部流程
二、Flink的应用(主要为flink sql)
1、sql-client方式:
-
准备数据:/data/video.csv
-
准备启动的环境配置文件:也可以不配置,将sql直接写在sql-client中
- 启动sql-client: ./bin/sql-client.sh embedded -e ./envConf/myTab.yaml
- 执行语句:select * from video_search;
-
查看结果:
2、JAVA API方式
2.1 准备测试数据:生成数据,发送到kafka的topic
pom.xml依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency