1. 基本概念
Flink是一款分布式的计算引擎,它可以用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时地处理一些实时数据流,实时地产生数据的结果;也可以用来做一些基于事件的应用。
kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
kafka名词解释:
producer:生产者。
consumer:消费者。
topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
2. 实现思路
实现流处理流程,对应核心要点描述如下:
1 读取配置文件生成配置JobConfig对象,设置Properties对象属性;
JobConfig conf = JobConfig.getInstance();
if (conf == null)
throw new ConfigurationException("配置项不能为空!");
Properties props = new Properties();
props.put("bootstrap.servers", conf.getKafkaCluster());
props.put("group.id", conf.getGroupId());
2 获取上下文环境StreamExecutionEnvironment对象与设置并行数;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (conf.getParallelism() != null)
env.setParallelism(conf.getParallelism());
3 通过topic、属性对象与序列化对象,声明Flink消费kafka的FlinkKafkaConsumer011数据对象与FlinkKafkaConsumer011规则对象;
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>
(conf.getCalcDataTopic(), new SimpleStringSchema(), props);
FlinkKafkaConsumer011<String> ruleConsumer = new FlinkKafkaConsumer011<>
(NOTICE_TOPIC, new SimpleStringSchema(), props);
4 将FlinkKafkaConsumer011数据对象加入到上下文环境StreamExecutionEnvironment对象中,并生成DataStream对象;
DataStream<String> dataStream = env.addSource(consumer);
5 声明表达式状态描述符的ExpressionStateDescriptor对象,并指定名称为CalcRuleBroadCastState;
ExpressionStateDescriptor expressionStateDescriptor = new ExpressionStateDescriptor("CalcRuleBroadCastState");
6 将FlinkKafkaConsumer011规则对象加入到上下文环境StreamExecutionEnvironment对象中生成BroadcastStream,并将ExpressionStateDescriptor对象对象广播到每个task下;
BroadcastStream<Notice<CalcMSRule>> ruleStream = env.addSource(ruleConsumer)
.map(new MapFunction<String, Notice<CalcMSRule>>() {
@Override
public Notice<CalcMSRule> map(String value) throws Exception {
Notice<CalcMSRule> notice = JSON.parseObject(value,
new TypeReference<DefaultNotice<CalcMSRule>>() {
});
return notice;
}
}).broadcast(expressionStateDescriptor);
7 将数据流DataStream对象的json字符串转化为对象,并去除空对象;
DataStream<Measurement> msStream = dataStream
.map((String jsonStr) -> TSParser.parseMeasurement(jsonStr))
.filter((Measurement m) -> !m.equals(Measurement.empty()));
8 声明mysql与influxdb数据的连接配置对象;
MysqlConf mysqlConf = new MysqlConf(conf.getMysqlUrl(),
conf.getMysqlUsername(), conf.getMysqlPassword());
InfluxDBConf influxDBConf = new InfluxDBConf(conf.getInfluxdbAddress(),
conf.getInfluxdbDatabase());
9 将数据流DataStream与规则流BroadcastStream连接可以得到新的连接connectedStream,基于connectedStream设置CalcMsFunction实现,来处理新的Stream中的数据记录,可以在每个Task中基于获取到统一的规则配置信息,进而处理用户事件,并设置名称计算点算子。
DataStream<MeasuringSignal> resultStream = msStream.connect(ruleStream)
.process(new CalcMsFunction(mysqlConf, influxDBConf)).name("计算点算子");
10 通过指定kafka地址与分组id参数生成kafka生产者FlinkKafkaProducer011对象,将上一步处理结果发送到Flink sink对象中
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(conf.getKafkaCluster(), conf.getCalcResultTopic(), new SimpleStringSchema());
resultStream.map(new MapFunction<MeasuringSignal, String>() {
@Override
public String map(MeasuringSignal value) throws Exception {
return TSParser.parse(value);
}
}).addSink(producer).name("Kafka Sink");
在Flink Job中开启Checkpoint功能,默认每隔500毫秒对Flink Job中的状态进行Checkpointing,以保证流处理过程发生故障后,也能够恢复。