在项目中,有时会遇到使用多个数据源的情况,并且需要支持事务。解决方式可以有以下几种:
- 对每个数据源手动配置orm框架(mybatis、jpa等)及事务管理器
- 使用 dynamic-datasource-spring-boot-starter 配置动态数据源
这里以同时使用jpa和mybatis-plus两种orm框架操作不同数据源为例,分别介绍两种实现方式。
    
    
    首先了解一下 spring 事务实现的方式
   
    以
    
     @Transactional
    
    注解为例 (也可以看 TransactionTemplate, 这个流程更简单一点)。
   
    入口:
    
     ProxyTransactionManagementConfiguration
    
    
    (从 config 类入手,需要哪些bean一目了然,然后直接顺着看下去就可以了)
   
主要有以下3个bean
- 
     
 TransactionAttributeSource
 
 :实现是 AnnotationTransactionAttributeSource, 提供从(存在
 
 @Transactional
 
 注解的)方法上读取事务的属性(注解的属性)的功能
- 
     
 TransactionInterceptor
 
 :事务方法拦截器的bean,在执行事务方法时,转到 (
 
 TransactionAspectSupport#invokeWithinTransaction
 
 ) 方法,即spring事务处理的主要逻辑。
- 
     
 BeanFactoryTransactionAttributeSourceAdvisor
 
 :一个advisor(包含一个 Pointcut 切点和一个 Advice 通知),advice就是上面的事务拦截器,Pointcut 切点匹配能通过
 
 TransactionAttributeSource
 
 获取到事务信息的方法。
    拦截器逻辑大概如下
    
     
   
在执行被拦截的事务方法中对数据进行crud时,orm框架会从 TransactionSynchronizationManager 中的 ThradLocal 获取 之前创建的 connection (DataSourceUtils);
    
    
    多数据源实现事务
   
    
    
    每个数据源手动配置orm框架
   
这种方式是通过手动声明创建orm框架对应的bean来实现多数据源的操作,即每个数据源都自己手动创建一套对用的bean。
(其实如果使用的spring而不是springboot的话,就不会有这种多数据源的疑问,因为本来就要自己声明bean。)
    
    
    依赖
   
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- 也可以直接依赖 mybatis-plus -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.2</version>
</dependency>
    
    
    配置文件
   
datasource:
  test2:
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://127.0.0.1:3306/test2?useSSL=true&maxAllowedPacket=16777216&pinGlobalTxToPhysicalConnection=true&nullNamePatternMatchesAll=true&autoReconnect=true&failOverReadOnly=false&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
    username: 
    password: 
    poolName: test2
  test:
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://127.0.0.1:3306/test?useSSL=true&maxAllowedPacket=16777216&pinGlobalTxToPhysicalConnection=true&nullNamePatternMatchesAll=true&autoReconnect=true&failOverReadOnly=false&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
    username: 
    password: 
    poolName: test
    
    
    配置jpa
   
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef = "testEntityManagerFactory",
        basePackages = {"demo.jpa.repository"}
)
public class JpaConfig {
    /**
     * 配置 DataSource,读取 datasource.test 下的配置
     * @Primary 设置为默认的datasource
     */
    @Primary
    @Bean(name = "testDataSource")
    @ConfigurationProperties(prefix = "datasource.test")
    public DataSource dataSource() {
        HikariDataSource hikariDataSource = DataSourceBuilder.create().type(HikariDataSource.class).build();
        hikariDataSource.setConnectionInitSql("set names utf8mb4");
        return hikariDataSource;
    }
    @Bean(name = "testEntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(
            EntityManagerFactoryBuilder builder,
            @Qualifier("testDataSource") DataSource dataSource) {
        return builder
                .dataSource(dataSource)
                .packages("demo.jpa.entity")
                .build();
    }
    @Bean(name = "testEntityManager")
    public EntityManager entityManager(@Qualifier("testEntityManagerFactory") EntityManagerFactory factory) {
        return factory.createEntityManager();
    }
    // 配置 jpa 事务管理器
    @Primary
    @Bean(name = "transactionManager")
    public PlatformTransactionManager transactionManager(
            @Qualifier("testEntityManagerFactory") EntityManagerFactory
                    entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory);
    }
}
    
    
    配置 mybatis-plus
   
@Configuration
public class MybaticConfig {
    @Bean(name = "test2Datasource")
    @ConfigurationProperties(prefix = "datasource.test2")
    public DataSource dataSource() {
        HikariDataSource hikariDataSource = DataSourceBuilder.create().type(HikariDataSource.class).build();
        hikariDataSource.setConnectionInitSql("set names utf8mb4");
        return hikariDataSource;
    }
    /**
     * 手动配置mybatis-plus sqlSessionFactory
     * 注意: 部分mybatis-plus-starter中的默认配置得手动加上
     */
    @Bean(name = "test2SqlSessionFactory")
    public MybatisSqlSessionFactoryBean tikuSqlSessionFactory(@Qualifier("test2Datasource") DataSource dataSource) throws IOException {
        MybatisSqlSessionFactoryBean mybatisSqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        mybatisSqlSessionFactoryBean.setTypeAliasesPackage("demo.mybatisplus.entity");
        mybatisSqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/*.xml"));
        mybatisSqlSessionFactoryBean.setDataSource(dataSource);
        // 设置分页插件
        mybatisSqlSessionFactoryBean.setPlugins(mybatisPlusInterceptor());
        return mybatisSqlSessionFactoryBean;
    }
    // 配置 mapper 扫描路径
    @Bean(name = "test2MapperScannerConfigurer")
    public MapperScannerConfigurer mapperScannerConfigurer() {
        MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
        mapperScannerConfigurer.setBasePackage("demo.mybatisplus.mapper");
        mapperScannerConfigurer.setSqlSessionFactoryBeanName("test2SqlSessionFactory");
        return mapperScannerConfigurer;
    }
    // 分页插件
    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }
    // 配置 事务管理器   
    @Bean(name = "test2TransactionManager")
    public DataSourceTransactionManager tikuTransactionManager(@Qualifier("test2Datasource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
}
    
    
    使用
   
在使用事务的时候,在 @Transactional 注解中指定对应的事务管理器的bean就可以了。
    // jpa 事务,(配置了primary,所以不用指定事务管理器)
    @Transactional
    public void testJpaTx() {
        UserInfo userInfo = new UserInfo();
        userInfo.setName("name-1");
        userInfoRepository.save(userInfo);
        // 这里debug断点,验证事务是否生效
        userInfo = new UserInfo();
        userInfo.setName("name-2");
        userInfoRepository.save(userInfo);
    }
    
    // mybatis-plus, 指定 事务管理器
    @Transactional(transactionManager = "test2TransactionManager")
    public void testMbTx() {
        UserInfo2 userInfo = new UserInfo2();
        userInfo.setName("name-1");
        userInfo2Mapper.insert(userInfo);
        userInfo2 = new UserInfo2();
        userInfo.setName("name-2");
        userInfo2Mapper.insert(userInfo);
    }
不支持多个数据源事务,手动配置较繁琐
    
    
    动态数据源
   
dynamic-datasource-spring-boot-starter 也是 baomidou 提供的(同mybatis-plus),通过 @DS 注解就能实现多数据源的操作。
    
    
    依赖
   
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
        <version>3.4.1</version>
    </dependency>
    
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.4.2</version>
    </dependency>
        
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    
    
    配置文件
   
spring:
  datasource:
    dynamic:
      # 指定默认的数据源
      primary: test 
      datasource:
        test:
          driverClassName: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/test?useSSL=true&maxAllowedPacket=16777216&pinGlobalTxToPhysicalConnection=true&nullNamePatternMatchesAll=true&autoReconnect=true&failOverReadOnly=false&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
          username: 
          password: 
          poolName: test
        test2:
          driverClassName: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/test2?useSSL=true&maxAllowedPacket=16777216&pinGlobalTxToPhysicalConnection=true&nullNamePatternMatchesAll=true&autoReconnect=true&failOverReadOnly=false&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
          username: 
          password: 
          poolName: test2
    
    
    使用示例
   
只需要在对应的类或方法上加上 @DS 注解并指定数据源就可以了。
    注意:
    
     @DS 必须加在 @Transactional 对应的类或者方法上
    
    。 如在 mapper中加了@DS,但是 @Transactional 加在 service 方法中,此时获取为默认的datasource。(因为在事务中已经获取了一次datasource的connection,而此时无DS注解)
   
@DS("test")
@Repository
public interface UserInfoRepository extends JpaRepository<UserInfo, Integer> {
}
@DS("test2")
@Mapper
public interface UserInfo2Mapper extends BaseMapper<UserInfo> {
}
@Service
public class TestService {
    // 同一个数据源中的事务
    @DS("test")
    @Transactional
    public void testJpaTx() {
        UserInfo userInfo = new UserInfo();
        userInfo.setName("name-1");
        userInfoRepository.save(userInfo);
        // 这里debug断点,验证事务是否生效
        userInfo = new UserInfo();
        userInfo.setName("name-2");
        userInfoRepository.save(userInfo);
    }
    
    // 多数据源事务
    @DSTransactional
    public void testMulti() {
        UserInfo userInfo = new UserInfo();
        userInfo.setName("name-1");
        userInfoRepository.save(userInfo);
        UserInfo2 userInfo = new UserInfo2();
        userInfo.setName("name-2");
        userInfo2Mapper.insert(userInfo);
    }
    
}
    
    
    动态数据源实现原理
   
    同样看一下
    
     DynamicDataSourceAutoConfiguration
    
    这个配置相关的类就大概了解了。
   
- 
     
 DynamicRoutingDataSource
 
 : 动态数据源,内部使用 Map 保存了多个数据源。获取 connection 时,根据 ThreadLocal 中的 dsKey 获取对应的数据源- 
       另:对于多数据源事务 (
 
 TransactionContext.getXID() isNotEmpty
 
 ),会返回一个
 
 ConnectionProxy
 
 并暂存到 ConnectionFactory 中, 该 ConnectionProxy 不会执行 commit、rollback、close 操作事务相关的方法。
 public Connection getConnection() throws SQLException { String xid = TransactionContext.getXID(); if (StringUtils.isEmpty(xid)) { // 非多数据源事务直接获取对应 connection return determineDataSource().getConnection(); } else { String ds = DynamicDataSourceContextHolder.peek(); ds = StringUtils.isEmpty(ds) ? "default" : ds; // 多数据源事务,使用代理的 connection (屏蔽了 commit 等操作) ConnectionProxy connection = ConnectionFactory.getConnection(ds); return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection; } } // 获取 代理的 connection, 并将其存入 ConnectionFactory, 内部维护一个 ThreadLocal<Map>, 同时会 setAutoCommit(false) 开启事务 private Connection getConnectionProxy(String ds, Connection connection) { ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds); ConnectionFactory.putConnection(ds, connectionProxy); return connectionProxy; } // DynamicRoutingDataSource // 从 ThreadLocal 获取当前 dsKey 然后获取对应 datasource public DataSource determineDataSource() { String dsKey = DynamicDataSourceContextHolder.peek(); return getDataSource(dsKey); }
- 
       另:对于多数据源事务 (
- 
     
 DynamicDataSourceAnnotationInterceptor
 
 : 处理 @DS 注解的拦截器,获取 @DS 指定的 datasource 并存入 ThreadLocal 中, 供 DynamicRoutingDataSource 使用
- 
     
 dynamicTransactionAdvisor
 
 : 处理
 
 @DSTransactional
 
 多数据源事务注解的拦截器,在执行目标方法前,标记为多数据源事务 (
 
 TransactionContext.bind(xid)
 
 ), 执行完后, 通知 ConnectionFactory 中的 connectionProxy 进行事务的 commit 或 rollback。// DynamicLocalTransactionAdvisor public Object invoke(MethodInvocation methodInvocation) throws Throwable { if (!StringUtils.isEmpty(TransactionContext.getXID())) { return methodInvocation.proceed(); } // 事务是否成功 boolean state = true; Object o; String xid = UUID.randomUUID().toString(); // 标记当前为 多数据源事务 TransactionContext.bind(xid); try { o = methodInvocation.proceed(); } catch (Exception e) { state = false; throw e; } finally { // 通知 connectionProxy 进行 commit 或 rollback ConnectionFactory.notify(state); TransactionContext.remove(); } return o; }
 
