shardingsphere+nacos增加数据源,热更新

  • Post author:
  • Post category:其他


最近遇到一个问题,需要做到动态增加数据源,不重启项目。比如有demo1,demo2然后添加demo3,在不重启项目的情况下生效,但shardingsphere不支持nacos注册中心,官网文档说只能通过spi自己实现注册中心,目前的做法是通过nacos监听,然后获取yaml文件去更新shardingsphere,项目用的springboot+mybatis plus+shardingsphere,只是简单写了一个demo

1.使用编排治理需要导入

<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-orchestration</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>

<!-- shardingsphere分库分表 -->
<dependency>
	<groupId>org.apache.shardingsphere</groupId>
	<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
	<version>4.0.1</version>
</dependency>

<dependency>
	<groupId>org.apache.shardingsphere</groupId>
	<artifactId>sharding-jdbc-spring-namespace</artifactId>
	<version>4.0.1</version>
</dependency>

<dependency>
	<groupId>com.alibaba.nacos</groupId>
	<artifactId>nacos-client</artifactId>
	<version>1.4.1</version>
</dependency>

<dependency>
	<groupId>org.apache.shardingsphere</groupId>
	<artifactId>sharding-orchestration-reg-nacos</artifactId>
	<version>4.0.1</version>
</dependency>

2.在META-INF/services下面创建org.apache.shardingsphere.orchestration.reg.api.RegistryCenter文件,文件里面的内容:

io.github.syske.springbootnacosdemo.shardingjdbc.LocalRegistryCenter这个是指定自己注册中心文件的路径

3.实现RegistryCenter

public class LocalRegistryCenter implements RegistryCenter {

    public static Map<String, DataChangedEventListener> listeners = new ConcurrentHashMap<>();
    private RegistryCenterConfiguration config;
    private Properties properties;

    public static Map<String, String> values = new ConcurrentHashMap<>();


    @Override
    public void init(RegistryCenterConfiguration registryCenterConfiguration) {
        this.config = registryCenterConfiguration;
    }

    @Override
    public void watch(String key, DataChangedEventListener dataChangedEventListener) {
        if(null != dataChangedEventListener){
            listeners.put(key,  dataChangedEventListener);
        }
    }

    @Override
    public String get(String key) {
        return values.get(key);
    }
    
    @Override
    public String getDirectly(String key) {
        return values.get(key);
    }

    @Override
    public boolean isExisted(String key) {
        return values.containsKey(key);
    }

    @Override
    public List<String> getChildrenKeys(String s) {
        return null;
    }

    @Override
    public void persist(String key, String value) {
        values.put(key, value);
    }

    @Override
    public void update(String s, String s1) {

    }

    @Override
    public void persistEphemeral(String s, String s1) {

    }

    @Override
    public void close() {

    }

    @Override
    public void initLock(String s) {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public void tryRelease() {

    }

    @Override
    public String getType() {
        return "localRegistryCenter";
    }

    @Override
    public Properties getProperties() {
        return null;
    }

    @Override
    public void setProperties(Properties properties) {

    }

4.初始化shardingsphere, “localRegistryCenter”是注册中心getType()里面指定的,

orchestration-sharding-data-source指定这个,之后获取数据源,数据分片都是用这个开头
public class ShardingConfig {

    public static String SHARDING_RULE_TABLE_ORDER = "";
    /**
     * 分片规则的集成配置
     */
    private static TableRuleConfiguration orderRuleConfig(int num){
        SHARDING_RULE_TABLE_ORDER = "demo$->{0.."+num+"}.share$->{0..1}";
        // SHARDING_RULE_TABLE_ORDER = "demo${0.."+num+"}.share${0..1}";
        TableRuleConfiguration userRuleConfiguration = new TableRuleConfiguration("share", SHARDING_RULE_TABLE_ORDER);
        userRuleConfiguration.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("id", "share$->{id % 2}"));
        userRuleConfiguration.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("pro_id", "demo$->{pro_id}"));
        userRuleConfiguration.setKeyGeneratorConfig(new KeyGeneratorConfiguration("SNOWFLAKE", "id"));
        /*ShardingRuleConfiguration shardingRuleConfiguration = new ShardingRuleConfiguration();
        shardingRuleConfiguration.getTableRuleConfigs().add(userRuleConfiguration);*/

        return userRuleConfiguration;
    }

    /**
     * 数据源Sharding JDBC配置
     * @return
     */
    @Primary
    @Bean(name = "tradeSystemDataSource")
    public DataSource tradeSystemDataSource() throws Exception{
        Map<String, DataSource> dataSourceMap = new HashMap<>();

        // 初始化配置服务,控制台通过示例代码自动获取下面参数
        String serverAddr = "http://192.168.1.16:8848";
        String dataId = "springboot-nacos-demo-dev.yaml";
        String group = "gsoacloud";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);

        //获取配置文件
        String content = configService.getConfig(dataId, group, 5000);
        Yaml yaml = new Yaml();
        Map<String, Object> map = yaml.load(content);
        Map<String, Object> springMap = (Map<String, Object>) map.get("spring");
        Map<String, Object> shareMap = (Map<String, Object>) springMap.get("shardingsphere");
        // System.out.println(shareMap.toString());
        Map<String, Object> datasourceMap = (Map<String, Object>) shareMap.get("datasource");

        int i = 0;

        for (Map.Entry<String, Object> vo : datasourceMap.entrySet()) {
            if (i != 0) {
                DatasourceDTO datasourceDTO = JSONObject.parseObject(JSONObject.toJSONString(vo.getValue()), DatasourceDTO.class);
                DruidDataSource druid = new DruidDataSource();
                druid.setDriverClassName(datasourceDTO.getDriverClassName());
                druid.setUrl(datasourceDTO.getUrl());
                druid.setUsername(datasourceDTO.getUsername());
                druid.setPassword(datasourceDTO.getPassword());
                dataSourceMap.put(vo.getKey(), druid);
            }
            i++;
        }

        ShardingRuleConfiguration shardJdbcConfig = new ShardingRuleConfiguration();
        shardJdbcConfig.getTableRuleConfigs().add(orderRuleConfig(i -2));
        //shardJdbcConfig.setDefaultDataSourceName("demo0");

        Properties props = new Properties();
        //打印sql语句,生产环境关闭减少日志量
        props.setProperty("sql.show",Boolean.TRUE.toString());

        // 服务编排配置, 加入本地注册中心配置类
        OrchestrationConfiguration orchestrationConfig = new OrchestrationConfiguration(
                "orchestration-sharding-data-source", new RegistryCenterConfiguration("localRegistryCenter"),
                false);

        DataSource dataSource = OrchestrationShardingDataSourceFactory.createDataSource(dataSourceMap, shardJdbcConfig, props,
                orchestrationConfig);

        return dataSource;
    }

    @Bean(name = "shardSqlSessionFactory")
    public MybatisSqlSessionFactoryBean shardSqlSessionFactory(@Qualifier("tradeSystemDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean  bean = new MybatisSqlSessionFactoryBean ();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper*.xml"));
        return bean;
    }

    /**
     * 替换sharding里的分片规则
     */
    public static void replaceActualDataNodes(String newRule){
        // 获取已有的配置
        String rules = LocalRegistryCenter.values
                .get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
        // 修改为新的分片规则
        // rules = newRule;
        String rule = rules.replace(SHARDING_RULE_TABLE_ORDER, newRule);
        LocalRegistryCenter.listeners.get("/orchestration-sharding-data-source/config/schema")
                .onChange(new DataChangedEvent(
                        "/orchestration-sharding-data-source/config/schema/logic_db/rule",
                        rule, DataChangedEvent.ChangedType.UPDATED));
        LocalRegistryCenter.values.put("/orchestration-sharding-data-source/config/schema/logic_db/rule",rule);
        SHARDING_RULE_TABLE_ORDER = newRule;
    }
}

5.通过实现InitializingBean添加nacos监听,然后获取nacos配置文件,更新到shardingsphere

@Configuration
public class DataSourceConfig extends DruidDataSource implements InitializingBean {


    @Override
    public void afterPropertiesSet() {
        try {
            nacosListener();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 添加nacos监听
     **/
    public void nacosListener() throws NacosException {
        String serverAddr = "http://192.168.1.16:8848";
        String dataId = "springboot-nacos-demo-dev.yaml";
        String group = "gsoacloud";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        configService.addListener(dataId, group, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                try {
                    DynamicShardingService.dynamicSharding();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        });
    }
}

5.DynamicShardingService方法刷新shardingsphere

public class DynamicShardingService {

    public static void dynamicSharding() {

        try {
            Map<String, DataSource> dataSourceMap = new HashMap<>();

            // 初始化配置服务,控制台通过示例代码自动获取下面参数
            String serverAddr = "http://192.168.1.16:8848";
            String dataId = "springboot-nacos-demo-dev.yaml";
            String group = "gsoacloud";
            Properties properties = new Properties();
            properties.put("serverAddr", serverAddr);
            ConfigService configService = NacosFactory.createConfigService(properties);

            //获取配置文件
            String content = configService.getConfig(dataId, group, 5000);
            Yaml yaml = new Yaml();
            Map<String, Object> map = yaml.load(content);
            Map<String, Object> springMap = (Map<String, Object>) map.get("spring");
            Map<String, Object> shareMap = (Map<String, Object>) springMap.get("shardingsphere");

            Map<String, Object> datasourceMap = (Map<String, Object>) shareMap.get("datasource");

            int i = 0;

            for (Map.Entry<String, Object> vo : datasourceMap.entrySet()) {
                if (i != 0) {
                    DatasourceDTO datasourceDTO = JSONObject.parseObject(JSONObject.toJSONString(vo.getValue()), DatasourceDTO.class);
                    DruidDataSource druid = new DruidDataSource();

                    druid.setDriverClassName(datasourceDTO.getDriverClassName());
                    druid.setUrl(datasourceDTO.getUrl());
                    druid.setUsername(datasourceDTO.getUsername());
                    druid.setPassword(datasourceDTO.getPassword());
                    dataSourceMap.put(vo.getKey(), druid);
                }
                i++;
            }

            Map<String, DataSourceConfiguration> dataSourceConfigMap = new HashMap();
            for (String key : dataSourceMap.keySet()) {
                dataSourceConfigMap.put(key, DataSourceConfiguration.getDataSourceConfiguration(dataSourceMap.get(key)));
            }

            //获取注册中心数据源配置
            OrchestrationShardingDataSource dataSource = SpringContextUtil.getBean("tradeSystemDataSource", OrchestrationShardingDataSource.class);

            // 重置数据源配置
            dataSource.renew(new DataSourceChangedEvent(
                    "/orchestration-sharding-data-source/config/schema/logic_db/datasource",
                    dataSourceConfigMap));

            //重置数据分片规则
            String newRule = "demo${0.."+(i-2)+"}.share${0..1}";
            ShardingConfig.replaceActualDataNodes(newRule);

            // 打印数分片规则更新是否成功
            /*String rules = LocalRegistryCenter.values
                    .get("/orchestration-sharding-data-source/config/schema/logic_db/rule");
            System.out.println(rules);

            // 打印数据源是否更新成功
            String datasource = LocalRegistryCenter.values
                    .get("/orchestration-sharding-data-source/config/schema/logic_db/datasource");
            System.out.println(datasource);*/

        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage(), e);
        }
    }
}

/orchestration-sharding-data-source/这个是初始化数据源指定的

/orchestration-sharding-data-source/config/schema/logic_db/datasource这是数据源
/orchestration-sharding-data-source/config/schema/logic_db/rule这个是数据分片

6.SpringContextUtil

@Component
@RefreshScope
public class SpringContextUtil implements ApplicationContextAware {
    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringContextUtil.applicationContext == null) {
            SpringContextUtil.applicationContext = applicationContext;
        }
    }
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static Object getBean(String maxloglength) {

        return getApplicationContext().getBean(maxloglength);
    }

    public static <T> T getBean(Class<T> clazz) {
        T test=getApplicationContext().getBean(clazz);
        return test;
    }

    public static <T> T getBean(String maxloglength, Class<T> clazz) {
        return getApplicationContext().getBean(maxloglength, clazz);
    }
}

7.在nacos里面加上

orchestration:

        name: orchestration_ds #实例名称

        overwrite: true

        registry:

                type: nacos

                namespace: public

                serverLists: 192.168.1.16:8848



版权声明:本文为YoungHK原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。