Springboot+shardingsphere实现一库多表分表(对多个表进行分表操作)

  • Post author:
  • Post category:其他


Springboot+shardingsphere实现多一个数据库里面的多个表进行分表操作,本文是针对mysql数据库中,两个表进行分表操作,实现根据分表字段自动创建不存在的表以及自动根据分表字段实现查询数据库操作。所使用的主要技术栈为shardingsphere+sqi。

maven依赖

        <dependency>
			<groupId>org.apache.shardingsphere</groupId>
			<artifactId>shardingsphere-jdbc-core</artifactId>
			<version>5.0.0</version>
		</dependency>

		<dependency>
			<groupId>velocity</groupId>
			<artifactId>velocity</artifactId>
			<version>1.4</version>
		</dependency>

config文件

@Slf4j
@Configuration
public class ShardingJdbcConfig {

  private final DataSourceProperties dataSourceProperties;

  public  static HikariDataSource hikariDataSource;

  static {
    Velocity.setProperty("resource.loader", "class");
    Velocity.setProperty("class.resource.loader.class",
        ClasspathResourceLoader.class.getName());
    try {
      Velocity.init();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public ShardingJdbcConfig(DataSourceProperties dataSourceProperties) {
    this.dataSourceProperties = dataSourceProperties;
  }

  public HikariDataSource jdbcDatasource() {
    HikariDataSource dataSource = new HikariDataSource();
    BeanUtils.copyProperties(dataSourceProperties,dataSource);
    dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
    dataSource.setJdbcUrl(dataSourceProperties.getUrl());
    dataSource.setUsername(dataSourceProperties.getUsername());
    dataSource.setPassword(dataSourceProperties.getPassword());
    dataSource.setConnectionTimeout(1000 * 60);
    hikariDataSource = dataSource;
    return dataSource;
  }

  @Bean
  @Primary
  @Qualifier("ds")
  public DataSource dataSource() throws SQLException {
    Map<String, DataSource> dataSourceMap = new HashMap<>();
    HikariDataSource dataSource = new HikariDataSource();
    dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
    dataSource.setJdbcUrl(dataSourceProperties.getUrl());
    dataSource.setUsername(dataSourceProperties.getUsername());
    dataSource.setPassword(dataSourceProperties.getPassword());
    dataSourceMap.put("ds", dataSource);

    // 配置 record 表规则
    ShardingTableRuleConfiguration recordTableRuleConfiguration = new ShardingTableRuleConfiguration("t_test_table", getActualDataNodes());

    // 配置分表策略
    recordTableRuleConfiguration.setTableShardingStrategy(new StandardShardingStrategyConfiguration("create_time", KafkaTestTableStandardShardingAlgorithm.TYPE));

    // 配置分片规则
    ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
    shardingRuleConfig.getTables().add(recordTableRuleConfiguration);

    // 配置分表算法
    Properties tableShardingAlgorithmProps = new Properties();
    tableShardingAlgorithmProps.put("jdbcDatasource", jdbcDatasource());
    shardingRuleConfig.getShardingAlgorithms().put(KafkaTestTableStandardShardingAlgorithm.TYPE,
        new ShardingSphereAlgorithmConfiguration(KafkaTestTableStandardShardingAlgorithm.TYPE, tableShardingAlgorithmProps)
    );

    // 配置 t_redp_face_match_event 表规则
    ShardingTableRuleConfiguration recordTableRuleConfigurationRedp = new ShardingTableRuleConfiguration("t_kafka_test", getActualDataNodesRedp());
    // 配置分表策略
    recordTableRuleConfigurationRedp.setTableShardingStrategy(new StandardShardingStrategyConfiguration("create_time", KafkaTestTableStandardShardingAlgorithm.TYPE)
    );

    // 配置分片规则
    shardingRuleConfig.getTables().add(recordTableRuleConfigurationRedp);

    // 配置分表算法
    shardingRuleConfig.getShardingAlgorithms().put(KafkaTestTableStandardShardingAlgorithm.TYPE, new ShardingSphereAlgorithmConfiguration(KafkaTestTableStandardShardingAlgorithm.TYPE, tableShardingAlgorithmProps));


    List listConfig = new ArrayList<ShardingRuleConfiguration>(){
      {
        add(shardingRuleConfig);
      }
    };
    // 创建 ShardingSphereDataSource
    return ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, listConfig, tableShardingAlgorithmProps);
  }

  private String getActualDataNodes() {
      return "ds.t_test_table${2021..2099}0${1..9},ds.t_test_table${2021..2099}1${0..2}";
    }

  private String getActualDataNodesRedp() {
    return "ds.t_kafka_test${2021..2099}0${1..9},ds.t_kafka_test${2021..2099}1${0..2}";
  }

}

两个表的分表算法逻辑

@Slf4j
public class KafkaTestTableStandardShardingAlgorithm implements StandardShardingAlgorithm<Date> {

    public static final String TYPE = "KafkaTestTableStandardShardingAlgorithm";

    private List<DataSource> dataSources;

    private final Set<String> TABLES = new ConcurrentHashSet<>();

    protected Properties props;
    protected String initDate;

    private  static ThreadLocal<DateFormat> dateformat = new ThreadLocal<>();

    @Override
    public String doSharding(Collection<String> collection, PreciseShardingValue<Date> shardingValue) {
        StringBuilder tableName = new StringBuilder();
        Date value = shardingValue.getValue();
        tableName.append(shardingValue.getLogicTableName()).append(getDateFormatter().format(value));
        log.info("执行操作的表名{}",tableName.toString());
        if (!TABLES.contains(tableName.toString())) {
            createTable(tableName.toString());
        }
        return tableName.toString();
    }

    @Override
    public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue) {
        Collection<String> result = new LinkedHashSet<>();

        Range<Date> shardingKey = rangeShardingValue.getValueRange();

        Date startTime = shardingKey.lowerEndpoint();

        Date endTime = shardingKey.upperEndpoint();

        // 获取起始,终止时间范围
        Date now = new Date();
        if (startTime.after(now)) {
            startTime = now;
        }
        if (endTime.after(now)) {
            endTime = now;
        }
        Collection<String> tables = getRoutTable(rangeShardingValue.getLogicTableName(), startTime, endTime);

        if (tables != null && tables.size() > 0) {
            result.addAll(tables);
        }
        return result;
    }

    @Override
    public void init() {
        this.dataSources = new ArrayList<DataSource>(){
            {
                add(ShardingJdbcConfig.hikariDataSource);
            }
        };
        this.syncTable();
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @Override
    public Properties getProps() {
        return props;
    }

    @Override
    public void setProps(Properties props) {
        this.props = props;
    }

    private void syncTable() {
        String sql = "select TABLE_NAME from information_schema.TABLES " +
                "where TABLE_NAME like 't_kafka_test%'";
        for (DataSource dataSource : this.dataSources) {
            try (PreparedStatement preparedStatement = dataSource.getConnection().prepareStatement(sql);
                 ResultSet rs = preparedStatement.executeQuery()) {
                while (rs.next()) {
                    TABLES.add(rs.getString(1));
                }
            } catch (SQLException e) {
                log.error("sync table error:{}", e.getMessage(), e);
            }
        }
    }

    private void createTable(String tableName) {
        try {
            Template template =
                    Velocity.getTemplate("template/ddlKafkaTest.template");
            VelocityContext context = new VelocityContext();
            context.put("tableName", tableName);
            StringWriter sw = new StringWriter();
            template.merge(context, sw);
            Resource resource = new ByteArrayResource(sw.toString().getBytes(StandardCharsets.UTF_8));
            for (DataSource dataSource : this.dataSources) {
                try (Connection connection = dataSource.getConnection()) {
                    ScriptUtils.executeSqlScript(connection, resource);
                }
            }
            TABLES.add(tableName);
        } catch (Exception e) {
            log.error("create table({}) error:{}", tableName, e.getMessage(), e);
        }
    }

    private static DateFormat getDateFormatter(){
        DateFormat dateFormat = dateformat.get();
        if (dateFormat == null) {
            dateFormat = new SimpleDateFormat("yyyyMM");
            dateformat.set(dateFormat);
        }
        return dateFormat;
    }

    private Collection<String> getRoutTable(String logicTableName, Date startTime, Date endTime) {
        Set<String> rouTables = new HashSet<>();
        if (startTime != null && endTime != null) {
            List<String> rangeNameList = getRangeNameList(startTime, endTime);
            for (String YearMonth : rangeNameList) {
                String tableName = logicTableName + YearMonth;
                if (!TABLES.contains(tableName)) {
                    createTable(tableName);
                }
                rouTables.add(tableName);
            }
        }
        return rouTables;
    }

    private static List<String> getRangeNameList(Date startTime, Date endTime) {
        List<String> result = Lists.newArrayList();

        DateTime beginOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(startTime);
        DateTime endOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(endTime);


        while (beginOfMonth.getTime() <= endOfMonth.getTime()) {
            result.add(getDateFormatter().format(beginOfMonth.getTime()));
            // 进行当前日期按月份 + 1
            beginOfMonth = beginOfMonth.offset(DateField.MONTH, 1);
        }
        return result;
    }
}
@Slf4j
public class TestTableStandardShardingAlgorithm implements StandardShardingAlgorithm<Date> {

    public static final String TYPE = "TestTableStandardShardingAlgorithm";

    private List<DataSource> dataSources;

    private final Set<String> TABLES = new ConcurrentHashSet<>();

    protected Properties props;
    protected String initDate;

    private  static ThreadLocal<DateFormat> dateformat = new ThreadLocal<>();

    @Override
    public String doSharding(Collection<String> collection, PreciseShardingValue<Date> shardingValue) {
        StringBuilder tableName = new StringBuilder();
        Date startTime = shardingValue.getValue();

        log.info("执行操作的表名{}", shardingValue.getLogicTableName() + getDateFormatter().format(startTime));
        tableName.append(shardingValue.getLogicTableName()).append(getDateFormatter().format(startTime));

        if (!TABLES.contains(tableName.toString())) {
            createTable(tableName.toString());
        }
        return tableName.toString();
    }

    @Override
    public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Date> rangeShardingValue) {
        Collection<String> result = new LinkedHashSet<>();

        Range<Date> shardingKey = rangeShardingValue.getValueRange();

        Date startTime = shardingKey.lowerEndpoint();

        Date endTime = shardingKey.upperEndpoint();

        // 获取起始,终止时间范围
        Date now = new Date();
        if (startTime.after(now)) {
            startTime = now;
        }
        if (endTime.after(now)) {
            endTime = now;
        }
        Collection<String> tables = getRoutTable(rangeShardingValue.getLogicTableName(), startTime, endTime);

        if (tables != null && tables.size() > 0) {
            result.addAll(tables);
        }
        return result;
    }

    @Override
    public void init() {
        this.dataSources = new ArrayList<DataSource>(){
            {
                add(ShardingJdbcConfig.hikariDataSource);
            }
        };
        this.syncTable();
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @Override
    public Properties getProps() {
        return props;
    }

    @Override
    public void setProps(Properties props) {
        this.props = props;
    }

    private void syncTable() {
        String sql = "select TABLE_NAME from information_schema.TABLES " +
                "where TABLE_NAME like 't_test_table%'";
        for (DataSource dataSource : this.dataSources) {
            try (PreparedStatement preparedStatement =
                         dataSource.getConnection().prepareStatement(sql);
                 ResultSet rs = preparedStatement.executeQuery()) {
                while (rs.next()) {
                    TABLES.add(rs.getString(1));
                }
            } catch (SQLException e) {
                log.error("sync table error:{}", e.getMessage(), e);
            }
        }
    }

    private void createTable(String tableName) {
        try {
            Template template =
                    Velocity.getTemplate("template/ddl.template");
            VelocityContext context = new VelocityContext();
            context.put("tableName", tableName);
            StringWriter sw = new StringWriter();
            template.merge(context, sw);
            Resource resource =
                    new ByteArrayResource(sw.toString().getBytes(StandardCharsets.UTF_8));
            for (DataSource dataSource : this.dataSources) {
                try (Connection connection = dataSource.getConnection()) {
                    ScriptUtils.executeSqlScript(connection, resource);
                }
            }
            TABLES.add(tableName);
        } catch (Exception e) {
            log.error("create table({}) error:{}", tableName, e.getMessage(), e);
        }
    }

    private static DateFormat getDateFormatter(){
        DateFormat dateFormat = dateformat.get();
        if (dateFormat == null) {
            dateFormat = new SimpleDateFormat("yyyyMM");
            dateformat.set(dateFormat);
        }
        return dateFormat;
    }

    private Collection<String> getRoutTable(String logicTableName, Date startTime, Date endTime) {
        Set<String> rouTables = new HashSet<>();
        if (startTime != null && endTime != null) {
            List<String> rangeNameList = getRangeNameList(startTime, endTime);
            for (String YearMonth : rangeNameList) {
                String tableName = logicTableName + YearMonth;
                if (!TABLES.contains(tableName)) {
                    createTable(tableName);
                }
                rouTables.add(tableName);
            }
        }
        return rouTables;
    }

    private static List<String> getRangeNameList(Date startTime, Date endTime) {
        List<String> result = Lists.newArrayList();

        DateTime beginOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(startTime);
        DateTime endOfMonth = cn.hutool.core.date.DateUtil.beginOfMonth(endTime);


        while (beginOfMonth.getTime() <= endOfMonth.getTime()) {
            result.add(getDateFormatter().format(beginOfMonth.getTime()));
            // 进行当前日期按月份 + 1
            beginOfMonth = beginOfMonth.offset(DateField.MONTH, 1);
        }
        return result;
    }
}

数据库表模板文件

模板文件目录如下

模板sql

CREATE TABLE if not exists ${tableName} (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_id` varchar(200) NOT NULL,
  `name` varchar(255) DEFAULT NULL COMMENT '收货人',
  `address` varchar(255) DEFAULT NULL COMMENT '收货人地址',
  `phone` varchar(255) DEFAULT NULL COMMENT '联系电话',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `order_id_index` (`order_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

SPI配置



版权声明:本文为juanqiao7497原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。