1 前言
当我们开发Flink程序的时候一些配置或一些规则可能需要根据系统要求或需求发生变更,一般常见的做法我们可以开启通过将配置或规则存储到数据库(如MYSQL,REDIS),然后通过在程序中我们自己通过JDBC连接的方式去读取规则。这样做比较麻烦,我们可以通过阿里开源的NACOS来协助我们实现这样的功能,NACOS 官网是如下定义的:
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
具体我们可以访问nacos中文官方文档
https://nacos.io/zh-cn/docs/what-is-nacos.html
2 环境准备
2.1 nacos下载地址
https://github.com/alibaba/nacos/releases
2.2 安装
学习时可以单机安装,因此不再赘述
2.3 启动
在window下通过点击bin/startup.cmd启动,启动后通过 http://localhost:8848/nacos/index.html地址验证是否启动成功
3 Flink整合Nacos实现动态服务配置更新
自定义一个Source读取nacos配置信息,在nacos配置界面修改配置信息,看是否能够正确读取
主类代码如下:
package com.hailong.flink.nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* Created by 袁海龙 on 2020/2/26.
* https://www.bilibili.com/video/av90742627
*/
public class FlinkNacosConfigTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
//测试的时候设置为1,可以方便查看输出日志
env.setParallelism(1);
String serverAddr="localhost";
String dataId="test";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties); //服务配置中心
String content = configService.getConfig(dataId, group, 5000); //获取配置
System.out.println("config content - - -> "+content);
//定义一个source
DataStream ds=env.addSource(new FlinkNacosSource());
//输出到控制台
ds.print();
env.execute();
}
}
FLink Source代码如下:
package com.hailong.flink.nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* @author 袁海龙
* 自定义source 读取配置信息
*/
public class FlinkNacosSource extends RichSourceFunction<String> {
Properties properties=new Properties();
ConfigService configService;
String config;
String dataId="test";
String group="DEFAULT_GROUP";
String serverAddr="localhost";
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
properties.put("serverAddr",serverAddr);
configService= NacosFactory.createConfigService(properties);
config=configService.getConfig(dataId,group,5000);
//开启监听器监听配置中心配置数据
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
config=configMsg;
System.out.println("open listener receive - - - >" +configMsg);
}
});
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
for(;;){
Thread.sleep(300);
System.out.println("obtain config infomation - - - > "+config);
//System.out.println("obtain config infomation - - - > "+configService.getConfig(dataId,group,5000));
ctx.collect(String.valueOf(System.currentTimeMillis()));
}
}
@Override
public void cancel() {
}
}
1 在source的open方法里面我们添加了一个nacos的监听来监听配置服务的更新
2 在run方法中读取更新的配置
到这里代码就已经完成了,我们可以启动Flink程序,并在Nacos的配置界面修改配置内容查看是否配置实现了动态更新
4 测试案例代码地址
https://github.com/314649558/flink-learning/tree/master/flink-nacos-learning