Flink安装及使用笔记

  • Post author:
  • Post category:其他





Apache Flink: 数据流上的有状态计算



https://flink.apache.org/zh/#



上面连接是flink的官网,里面有很详细的文档。这里对flink进行大体总结。

原理:

图片从官网拷的,主要描述了3者关系:

FlinkProgram:调用发起方

JobManager:任务调度方

TaskManager:任务执行方

举个栗子:客户提了个需求 给老大,老大说ok,让张三的团队去干。那么客户是FlinkProgram,老大是JobManager,张三是TaskManager,张三下面的人具体执行任务。

在这个案例中,老板掌握了所有资源,张三使用了资源。所以在flink的系统中,JobManager是资源的上限,就是tasksolt。

tasksolt在启动JobManager时是可配置的,也是执行任务的最大并行数量。

还是栗子:客户说我需要5个人来写个软件,老大说,我这目前空余只有3个人,(那么此时tasksolt空闲为3),客户就只能一直等待,其他项目放出人员出来,凑齐5个后,就可以开始了,但是,这里具体是哪5个人呢,怎么分工呢,是张三(TaskManager)来决定的。

原理到此为止

使用方式

部署JobManager


Apache Flink: 下载

下载bin包,之后上传到服务器

tar -vxf xxx.tar 解压完成

进入bin包,启动start-cluster.sh

telnet -tnlp

启动成功,可以进行IP:8081进行访问了

此时,JobManager部署好了,接下来需要进行集群和优化配置:

进入flink根目录

vi conf/flink-conf.yaml

jobmanager.memory.process.size 1024m  jobmanger的最大内存,任务间传输数据量越大越多
taskmanager.memory.process.size 任务处理程序的最大内存,越大越好
taskmanager.numberOfTaskSlots

任务槽  并行的数量,可用:cpu*2,也可用

taskmanager.memory.process.size / 预估单任务内存占用量

parallelism.default 1,一般不要动

high-availability

zookeeper 高可用,这里用zk

high-availability.storageDir

/data/flinkha 写本地 或网络地址,都OK

high-availability.zookeeper.quorum

localhost:2181,localhost:2182  zk地址

其他的配置可根据需求针对更改

配置更改完成后,保存配置文件

bin/stop-cluster.sh

bin/start-cluster.sh

进行重启,此时可以看到数据已经更新

此时,新建一个maven项目,填入官方的例子

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

打包出来,上传到JobManager,点击submit,OK。在jobs  runningJobs中看到如下

在服务器上 :

nc -lk 9999

然后随便输入,这边就有输出了

后面要做的就是通过actorsystem提交job了



版权声明:本文为yangyongdehao30原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。