Apache Flink: 数据流上的有状态计算
https://flink.apache.org/zh/#
上面连接是flink的官网,里面有很详细的文档。这里对flink进行大体总结。
原理:
图片从官网拷的,主要描述了3者关系:
FlinkProgram:调用发起方
JobManager:任务调度方
TaskManager:任务执行方
举个栗子:客户提了个需求 给老大,老大说ok,让张三的团队去干。那么客户是FlinkProgram,老大是JobManager,张三是TaskManager,张三下面的人具体执行任务。
在这个案例中,老板掌握了所有资源,张三使用了资源。所以在flink的系统中,JobManager是资源的上限,就是tasksolt。
tasksolt在启动JobManager时是可配置的,也是执行任务的最大并行数量。
还是栗子:客户说我需要5个人来写个软件,老大说,我这目前空余只有3个人,(那么此时tasksolt空闲为3),客户就只能一直等待,其他项目放出人员出来,凑齐5个后,就可以开始了,但是,这里具体是哪5个人呢,怎么分工呢,是张三(TaskManager)来决定的。
原理到此为止
使用方式
部署JobManager
下载bin包,之后上传到服务器
tar -vxf xxx.tar 解压完成
进入bin包,启动start-cluster.sh
telnet -tnlp
启动成功,可以进行IP:8081进行访问了
此时,JobManager部署好了,接下来需要进行集群和优化配置:
进入flink根目录
vi conf/flink-conf.yaml
jobmanager.memory.process.size | 1024m jobmanger的最大内存,任务间传输数据量越大越多 |
taskmanager.memory.process.size | 任务处理程序的最大内存,越大越好 |
taskmanager.numberOfTaskSlots |
任务槽 并行的数量,可用:cpu*2,也可用 taskmanager.memory.process.size / 预估单任务内存占用量 |
parallelism.default | 1,一般不要动 |
high-availability |
zookeeper 高可用,这里用zk |
high-availability.storageDir |
/data/flinkha 写本地 或网络地址,都OK |
high-availability.zookeeper.quorum |
localhost:2181,localhost:2182 zk地址 |
其他的配置可根据需求针对更改
配置更改完成后,保存配置文件
bin/stop-cluster.sh
bin/start-cluster.sh
进行重启,此时可以看到数据已经更新
此时,新建一个maven项目,填入官方的例子
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
打包出来,上传到JobManager,点击submit,OK。在jobs runningJobs中看到如下
在服务器上 :
nc -lk 9999
然后随便输入,这边就有输出了
后面要做的就是通过actorsystem提交job了