最近遇到一个问题,需要做到动态增加数据源,不重启项目。比如有demo1,demo2然后添加demo3,在不重启项目的情况下生效,但shardingsphere不支持nacos注册中心,官网文档说只能通过spi自己实现注册中心,目前的做法是通过nacos监听,然后获取yaml文件去更新shardingsphere,项目用的springboot+mybatis plus+shardingsphere,只是简单写了一个demo
1.使用编排治理需要导入
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-orchestration</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<!-- shardingsphere分库分表 -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-namespace</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-orchestration-reg-nacos</artifactId>
<version>4.0.1</version>
</dependency>
2.在META-INF/services下面创建org.apache.shardingsphere.orchestration.reg.api.RegistryCenter文件,文件里面的内容:
io.github.syske.springbootnacosdemo.shardingjdbc.LocalRegistryCenter这个是指定自己注册中心文件的路径
3.实现RegistryCenter
public class LocalRegistryCenter implements RegistryCenter {
public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();
private RegistryCenterConfiguration config;
private Properties properties;
public static Map<String, String> values = new ConcurrentHashMap<>();
@Override
public void init(RegistryCenterConfiguration registryCenterConfiguration) {
this.config = registryCenterConfiguration;
}
@Override
public void watch(String key, DataChangedEventListener dataChangedEventListener) {
if(null != dataChangedEventListener){
listeners.put(key, dataChangedEventListener);
}
}
@Override
public String get(String key) {
return values.get(key);
}
@Override
public String getDirectly(String key) {
return values.get(key);
}
@Override
public boolean isExisted(String key) {
return values.containsKey(key);
}
@Override
public List<String> getChildrenKeys(String s) {
return null;
}
@Override
public void persist(String key, String value) {
values.put(key, value);
}
@Override
public void update(String s, String s1) {
}
@Override
public void persistEphemeral(String s, String s1) {
}
@Override
public void close() {
}
@Override
public void initLock(String s) {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public void tryRelease() {
}
@Override
public String getType() {
return "localRegistryCenter";
}
@Override
public Properties getProperties() {
return null;
}
@Override
public void setProperties(Properties properties) {
}
4.初始化shardingsphere, “localRegistryCenter”是注册中心getType()里面指定的,
orchestration-sharding-data-source指定这个,之后获取数据源,数据分片都是用这个开头
public class ShardingConfig {
public static String SHARDING_RULE_TABLE_ORDER = "";
/**
* 分片规则的集成配置
*/
private static TableRuleConfiguration orderRuleConfig(int num){
SHARDING_RULE_TABLE_ORDER = "demo$->{0.."+num+"}.share$->{0..1}";
// SHARDING_RULE_TABLE_ORDER = "demo${0.."+num+"}.share${0..1}";
TableRuleConfiguration userRuleConfiguration = new TableRuleConfiguration("share", SHARDING_RULE_TABLE_ORDER);
userRuleConfiguration.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("id", "share$->{id % 2}"));
userRuleConfiguration.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("pro_id", "demo$->{pro_id}"));
userRuleConfiguration.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "id"));
/*ShardingRuleConfiguration shardingRuleConfiguration = new ShardingRuleConfiguration();
shardingRuleConfiguration.getTableRuleConfigs().add(userRuleConfiguration);*/
return userRuleConfiguration;
}
/**
* 数据源Sharding JDBC配置
* @return
*/
@Primary
@Bean(name = "tradeSystemDataSource")
public DataSource tradeSystemDataSource() throws Exception{
Map<String, DataSource> dataSourceMap = new HashMap<>();
// 初始化配置服务,控制台通过示例代码自动获取下面参数
String serverAddr = "http://192.168.1.16:8848";
String dataId = "springboot-nacos-demo-dev.yaml";
String group = "gsoacloud";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
//获取配置文件
String content = configService.getConfig(dataId, group, 5000);
Yaml yaml = new Yaml();
Map<String, Object> map = yaml.load(content);
Map<String, Object> springMap = (Map<String, Object>) map.get("spring");
Map<String, Object> shareMap = (Map<String, Object>) springMap.get("shardingsphere");
// System.out.println(shareMap.toString());
Map<String, Object> datasourceMap = (Map<String, Object>) shareMap.get("datasource");
int i = 0;
for (Map.Entry<String, Object> vo : datasourceMap.entrySet()) {
if (i != 0) {
DatasourceDTO datasourceDTO = JSONObject.parseObject(JSONObject.toJSONString(vo.getValue()), DatasourceDTO.class);
DruidDataSource druid = new DruidDataSource();
druid.setDriverClassName(datasourceDTO.getDriverClassName());
druid.setUrl(datasourceDTO.getUrl());
druid.setUsername(datasourceDTO.getUsername());
druid.setPassword(datasourceDTO.getPassword());
dataSourceMap.put(vo.getKey(), druid);
}
i++;
}
ShardingRuleConfiguration shardJdbcConfig = new ShardingRuleConfiguration();
shardJdbcConfig.getTableRuleConfigs().add(orderRuleConfig(i -2));
//shardJdbcConfig.setDefaultDataSourceName("demo0");
Properties props = new Properties();
//打印sql语句,生产环境关闭减少日志量
props.setProperty("sql.show",Boolean.TRUE.toString());
// 服务编排配置, 加入本地注册中心配置类
OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration(
"orchestration-sharding-data-source", new RegistryCenterConfiguration("localRegistryCenter"),
false);
DataSource dataSource = OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardJdbcConfig, props,
orchestrationConfig);
return dataSource;
}
@Bean(name = "shardSqlSessionFactory")
public MybatisSqlSessionFactoryBean shardSqlSessionFactory(@Qualifier("tradeSystemDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean ();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper*.xml"));
return bean;
}
/**
* 替换sharding里的分片规则
*/
public static void replaceActualDataNodes(String newRule){
// 获取已有的配置
String rules = LocalRegistryCenter.values
.get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
// 修改为新的分片规则
// rules = newRule;
String rule = rules.replace(SHARDING_RULE_TABLE_ORDER, newRule);
LocalRegistryCenter.listeners.get("/orchestration-sharding-data-source/config/schema")
.onChange(new DataChangedEvent(
"/orchestration-sharding-data-source/config/schema/logic_db/rule",
rule, DataChangedEvent.ChangedType.UPDATED));
LocalRegistryCenter.values.put("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule);
SHARDING_RULE_TABLE_ORDER = newRule;
}
}
5.通过实现InitializingBean添加nacos监听,然后获取nacos配置文件,更新到shardingsphere
@Configuration
public class DataSourceConfig extends DruidDataSource implements InitializingBean {
@Override
public void afterPropertiesSet() {
try {
nacosListener();
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 添加nacos监听
**/
public void nacosListener() throws NacosException {
String serverAddr = "http://192.168.1.16:8848";
String dataId = "springboot-nacos-demo-dev.yaml";
String group = "gsoacloud";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
try {
DynamicShardingService.dynamicSharding();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public Executor getExecutor() {
return null;
}
});
}
}
5.DynamicShardingService方法刷新shardingsphere
public class DynamicShardingService {
public static void dynamicSharding() {
try {
Map<String, DataSource> dataSourceMap = new HashMap<>();
// 初始化配置服务,控制台通过示例代码自动获取下面参数
String serverAddr = "http://192.168.1.16:8848";
String dataId = "springboot-nacos-demo-dev.yaml";
String group = "gsoacloud";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
//获取配置文件
String content = configService.getConfig(dataId, group, 5000);
Yaml yaml = new Yaml();
Map<String, Object> map = yaml.load(content);
Map<String, Object> springMap = (Map<String, Object>) map.get("spring");
Map<String, Object> shareMap = (Map<String, Object>) springMap.get("shardingsphere");
Map<String, Object> datasourceMap = (Map<String, Object>) shareMap.get("datasource");
int i = 0;
for (Map.Entry<String, Object> vo : datasourceMap.entrySet()) {
if (i != 0) {
DatasourceDTO datasourceDTO = JSONObject.parseObject(JSONObject.toJSONString(vo.getValue()), DatasourceDTO.class);
DruidDataSource druid = new DruidDataSource();
druid.setDriverClassName(datasourceDTO.getDriverClassName());
druid.setUrl(datasourceDTO.getUrl());
druid.setUsername(datasourceDTO.getUsername());
druid.setPassword(datasourceDTO.getPassword());
dataSourceMap.put(vo.getKey(), druid);
}
i++;
}
Map<String, DataSourceConfiguration> dataSourceConfigMap = new HashMap();
for (String key : dataSourceMap.keySet()) {
dataSourceConfigMap.put(key, DataSourceConfiguration.getDataSourceConfiguration(dataSourceMap.get(key)));
}
//获取注册中心数据源配置
OrchestrationShardingDataSource dataSource = SpringContextUtil.getBean("tradeSystemDataSource", OrchestrationShardingDataSource.class);
// 重置数据源配置
dataSource.renew(new DataSourceChangedEvent(
"/orchestration-sharding-data-source/config/schema/logic_db/datasource",
dataSourceConfigMap));
//重置数据分片规则
String newRule = "demo${0.."+(i-2)+"}.share${0..1}";
ShardingConfig.replaceActualDataNodes(newRule);
// 打印数分片规则更新是否成功
/*String rules = LocalRegistryCenter.values
.get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
System.out.println(rules);
// 打印数据源是否更新成功
String datasource = LocalRegistryCenter.values
.get("/orchestration-sharding-data-source/config/schema/logic_db/datasource");
System.out.println(datasource);*/
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
}
}
}
/orchestration-sharding-data-source/这个是初始化数据源指定的
/orchestration-sharding-data-source/config/schema/logic_db/datasource这是数据源
/orchestration-sharding-data-source/config/schema/logic_db/rule这个是数据分片
6.SpringContextUtil
@Component
@RefreshScope
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextUtil.applicationContext == null) {
SpringContextUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String maxloglength) {
return getApplicationContext().getBean(maxloglength);
}
public static <T> T getBean(Class<T> clazz) {
T test=getApplicationContext().getBean(clazz);
return test;
}
public static <T> T getBean(String maxloglength, Class<T> clazz) {
return getApplicationContext().getBean(maxloglength, clazz);
}
}
7.在nacos里面加上
orchestration:
name: orchestration_ds #实例名称
overwrite: true
registry:
type: nacos
namespace: public
serverLists: 192.168.1.16:8848
版权声明:本文为YoungHK原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。