惊!SpringBoot+Mybatis+Durid+JTA+ATOMIKOS实现分布式事务(注解切换多数据源)

  • Post author:
  • Post category:其他




分享!注解切换多数据源(例子:Mysql&&Oracle)

概述:

前面三篇文章介绍了不支持分布式事务的数据源切换方式,无论是分包还是Aop方式切换,当遇到多数据源在同一被@Transactional标注的 Service方法体内无法切换数据源的问题,然后由此引发的多数据源事务无法被控制的情况,对业务的影响很不友好。


这一篇我选择的是以

AOP注解

的方式去进行数据源的动态切换,顺带整合

jta-atomikos

把烦人的事务问题解决调,持久层框架用

mybatis

,数据库连接池使用

druid

,这些在我们周围目前使用比较多,方便大家根据项目实际需求,能在这个脚手架上进行进一步的扩展

之所以无法切换数据源,传统的切换数据源方式配置事务使用的是如下方式:

    @Bean(name = "transactionManager")
    public PlatformTransactionManager transactionManager()  {
        return new DataSourceTransactionManager(dynamicDataSource());
    }

这是通过

DataSourceTransactionManager

类的构造方法进行管理,进而查看该类的构造方法

 @Nullable
    private DataSource dataSource;
    private boolean enforceReadOnly;

    public DataSourceTransactionManager() {
        this.enforceReadOnly = false;
        this.setNestedTransactionAllowed(true);
    }

    public DataSourceTransactionManager(DataSource dataSource) {
        this();
        this.setDataSource(dataSource);
        this.afterPropertiesSet();
    }

在每次使用事务注解的时候就已经设置好了DataSource,有且仅有一个

所以,我们在外面再怎么设置要求切换数据源也没用。




接下来介绍实现的方法:

pom文件要额外引入一个依赖:

		<!--分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

yml文件:

spring:
  profiles: dev
  # MySQL数据库配置
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    one:
      arraign:
        url: jdbc:mysql://*****:3306/one?useUnicode=true&serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false
        username: username
        password: password
        driverClassName: com.mysql.cj.jdbc.Driver
        # 初始化时建立物理连接的个数。初始化发生在显示调用 init 方法,或者第一次 getConnection 时
        initialSize: 5
        # 最小连接池数量
        minIdle: 5
        # 最大连接池数量
        maxActive: 10
        # 获取连接时最大等待时间,单位毫秒。配置了 maxWait 之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置 useUnfairLock 属性为 true 使用非公平锁。
        maxWait: 60000
        # Destroy 线程会检测连接的间隔时间,如果连接空闲时间大于等于 minEvictableIdleTimeMillis 则关闭物理连接。
        timeBetweenEvictionRunsMillis: 60000
        # 连接保持空闲而不被驱逐的最小时间
        minEvictableIdleTimeMillis: 300000
        # 用来检测连接是否有效的 sql 因数据库方言而异, 例如 oracle 应该写成 SELECT 1 FROM DUAL
        validationQuery: SELECT 1
        # 建议配置为 true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于 timeBetweenEvictionRunsMillis,执行 validationQuery 检测连接是否有效。
        testWhileIdle: true
        # 申请连接时执行 validationQuery 检测连接是否有效,做了这个配置会降低性能。
        testOnBorrow: false
        # 归还连接时执行 validationQuery 检测连接是否有效,做了这个配置会降低性能。
        testOnReturn: false
        # 是否自动回收超时连接
        removeAbandoned: true
        # 超时时间 (以秒数为单位)
        remove-abandoned-timeout: 1800
        logAbandoned: true
        pinGlobalTxToPhysicalConnection: true
      oracleTwo:
        url: jdbc:oracle:thin:@*****:1521:oracleTwo
        username: username
        password: password
        driverClassName: oracle.jdbc.driver.OracleDriver
        # 初始化时建立物理连接的个数。初始化发生在显示调用 init 方法,或者第一次 getConnection 时
        initialSize: 5
        # 最小连接池数量
        minIdle: 5
        # 最大连接池数量
        maxActive: 10
        # 获取连接时最大等待时间,单位毫秒。配置了 maxWait 之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置 useUnfairLock 属性为 true 使用非公平锁。
        maxWait: 60000
        # Destroy 线程会检测连接的间隔时间,如果连接空闲时间大于等于 minEvictableIdleTimeMillis 则关闭物理连接。
        timeBetweenEvictionRunsMillis: 60000
        # 连接保持空闲而不被驱逐的最小时间
        minEvictableIdleTimeMillis: 300000
        # 用来检测连接是否有效的 sql 因数据库方言而异, 例如 oracle 应该写成 SELECT 1 FROM DUAL
        validationQuery: SELECT 1 FROM DUAL
        # 建议配置为 true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于 timeBetweenEvictionRunsMillis,执行 validationQuery 检测连接是否有效。
        testWhileIdle: true
        # 申请连接时执行 validationQuery 检测连接是否有效,做了这个配置会降低性能。
        testOnBorrow: false
        # 归还连接时执行 validationQuery 检测连接是否有效,做了这个配置会降低性能。
        testOnReturn: false
        # 是否自动回收超时连接
        removeAbandoned: true
        # 超时时间 (以秒数为单位)
        remove-abandoned-timeout: 1800
        logAbandoned: true
        pinGlobalTxToPhysicalConnection: true




小二!上代码!



自定义注解 KYDataSource.java
import java.lang.annotation.*;

/**
 * @Description:自定义注解
 * @Author:YJG
 * @Date 9:24 2020/5/25
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface KYDataSource {
    String value() default "one";
}


数据源名称 DataSourceName.java
/**
 * @Author:yjg86166
 * @Date:2020/5/25 16:41
 */
public final class DataSourceName {

    public static final String ONE = "one";
    public static final String ORACLETWO= "oracleTwo";
}


动态切换数据源配置文件 DataSourceContextHolder.java
import lombok.extern.slf4j.Slf4j;

/**
 * @Description:动态切换数据源配置文件
 * @Author:YJG
 * @Date 9:36 2020/5/25
 */
@Slf4j
public class DataSourceContextHolder  {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();


    /**设置数据源名称*/
    public static void setDataSource(String dataSource){
        log.info("应用<{}>数据源!",dataSource);
        contextHolder.set(dataSource);
    }

    public static String getDataSource(){
        return contextHolder.get();
    }

    /**清除数据源*/
    public static void clearDataSource(){
        contextHolder.remove();
    }

}


自定义Aop切面,用于注解切换数据源DataSourceAspect.java
import ky.arraign.common.datasourcetools.kyannotation.KYDataSource;
import ky.arraign.common.datasourcetools.kydsconfig.DataSourceContextHolder;
import ky.arraign.common.datasourcetools.kydsconfig.DataSourceName;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;


/**
 * @Description:自定义切面
 * @Author:YJG
 * @Date 9:43 2020/5/25
 */
@Aspect
@Component
@Slf4j
public class DataSourceAspect {

    /**
     * 切点: 所有配置 DataSource 注解的方法
     * 写上自己的@interface路径
     */
    @Pointcut("@annotation(*.*.***.datasourcetools.kyannotation.KYDataSource)")
    public void dataSourcePointCut() {}

    @Around("dataSourcePointCut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        KYDataSource ds;
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        //获取自定义注解
        ds = method.getAnnotation(KYDataSource.class);
        if (ds == null) {
            //如果监测到自定义注解不存在,那么默认切换到数据源
            DataSourceContextHolder.setDataSource(DataSourceName.ARRAIGN);
            log.info("进入《{}》方法设置默认数据源--<<{}>>--", method.getName(),DataSourceName.ARRAIGN);
        } else {
            //自定义存在,则按照注解的值去切换数据源
            DataSourceContextHolder.setDataSource(ds.value());
            log.info("进入《{}》方法数据源为--<<{}>>--",method.getName(),ds.value());
        }
        return point.proceed();
    }


    @After(value = "dataSourcePointCut()")
    public void afterSwitchDS(JoinPoint point) {
        DataSourceContextHolder.clearDataSource();
        log.info("清空数据源!");
    }

}

到这里,基本的动态切换边框的东西都完毕了,接下来是比较

核心

的:



DataSourceConfig.java :

用于 不同的数据源DataSource的信息配置,使用DruidXADataSource创建,支持jta事务;

将不同数据源DataSource分别都关联上对应的AtomikosDataSourceBean,这样事务能提取到JTA事务管理器;

重写数据源会话工厂,为每个数据源单独配置一个。

配置重写的sqlSessionTemplate,将实际使用的不同数据源的sqlsession和spring的事务机制关联起来。


import com.alibaba.druid.pool.xa.DruidXADataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * @Description:动态切换数据源配置文件
 * @Author:YJG
 * @Date 9:39 2020/5/25
 */
@Configuration
@Order(-1)
@Slf4j
@MapperScan(sqlSessionTemplateRef = "sqlSessionTemplate", basePackages = "ky.One.*.mapper")
public class DataSourceConfig {
    /**
     * 引入数据源
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.One")
    public DataSource getDateSourceOne() {
        log.info("——————————初始化构建One数据源!——————————");
        return new DruidXADataSource();
    }


    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.OracleTwo")
    public DataSource getDateSourceOracleTwo() {
        log.info("——————————初始化构建OracleTwo数据源!——————————");
        return new DruidXADataSource();
    }

    /**
     * 创建支持 XA 事务的 Atomikos 数据源 OneDataSource
     */
    @Bean
    public DataSource dataSourceOne(DataSource getDateSourceOne) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setXaDataSource((DruidXADataSource) getDateSourceOne);
        // 必须为数据源指定唯一标识
        sourceBean.setPoolSize(5);
        sourceBean.setTestQuery("SELECT 1");
        sourceBean.setUniqueResourceName("OneDataSource");
        log.info("——————————AtomikosDataSourceBean设置One数据源!——————————");
        return sourceBean;
    }

   

    /**
     * 创建支持 XA 事务的 Atomikos 数据源 OracleTwoDataSource
     */
    @Bean
    public DataSource dataSourceOracleTwo(DataSource getDateSourceOracleTwo) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setXaDataSource((DruidXADataSource) getDateSourceOracleTwo);
        // 必须为数据源指定唯一标识
        sourceBean.setPoolSize(5);
        //Oracle需要使用 “SELECT 1 FROM DUAL”
        sourceBean.setTestQuery("SELECT 1 FROM DUAL");
        sourceBean.setUniqueResourceName("OracleTwoDataSource");
        log.info("——————————AtomikosDataSourceBean设置OracleTwo数据源!——————————");
        return sourceBean;
    }

    /**
     * @param dataSourceOne 数据源 OneDataSource
     * @return 数据源 dataSourceOne 的会话工厂
     */
    @Bean
    public SqlSessionFactory sqlSessionFactoryOne(DataSource dataSourceOne)
            throws Exception {
        log.info("——————————SqlSessionFactory >>>> sqlSessionFactoryOne!——————————");
        return createSqlSessionFactory(dataSourceOne, DataSourceName.One);
    }

    
    /**
     * @param dataSourceOracleTwo 数据源 OracleTwoDataSource
     * @return 数据源 dataSourceOracleTwo 的会话工厂
     */
    @Bean
    public SqlSessionFactory sqlSessionFactoryOracleTwo(DataSource dataSourceOracleTwo)
            throws Exception {
        log.info("——————————SqlSessionFactory >>>> sqlSessionFactoryOracleTwo! ——————————");
        return createSqlSessionFactory(dataSourceOracleTwo, DataSourceName.OracleTwo);
    }


    /***
     * sqlSessionTemplate 与 Spring 事务管理一起使用,以确保使用的实际 SqlSession 是与当前 Spring 事务关联的,
     * 此外它还管理会话生命周期,包括根据 Spring 事务配置根据需要关闭,提交或回滚会话
     */
    @Bean
    public CustomSqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactoryOne,
                                                       SqlSessionFactory sqlSessionFactoryDzbl,
                                                       SqlSessionFactory sqlSessionFactoryOracleTwo) {
        Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>(3);
        sqlSessionFactoryMap.put(DataSourceName.One, sqlSessionFactoryOne);
        sqlSessionFactoryMap.put(DataSourceName.DZBL, sqlSessionFactoryDzbl);
        sqlSessionFactoryMap.put(DataSourceName.OracleTwo, sqlSessionFactoryOracleTwo);
        CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(sqlSessionFactoryOne);
        customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);
        log.info("配置继承SqlSessionTemplate的子类>>>CustomSqlSessionTemplate类");
        return customSqlSessionTemplate;
    }

    /***
     * 自定义会话工厂
     * @param dataSource 数据源
     * @return :自定义的会话工厂
     */
    private SqlSessionFactory createSqlSessionFactory(DataSource dataSource, String dataSourceName) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        log.info("SqlSessionFactoryBean开始设置数据源<<{}>>", dataSourceName);
        factoryBean.setDataSource(dataSource);
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        log.info("SqlSessionFactoryBean<<{}>>开始配置Mybatis", dataSourceName);
        factoryBean.setConfiguration(configuration);
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        //配置读取mapper.xml路径
        log.info("SqlSessionFactoryBean<<{}>>开始配置mapper.xml路径", dataSourceName);
        factoryBean.setMapperLocations(
                resolver.getResources("classpath*:ky/One/**/mapper/xml/*Mapper.xml"));
        return factoryBean.getObject();
    }
}



上面用到的自定义CustomSqlSessionTemplate.java (重写SqlSessionTemplate)
import static java.lang.reflect.Proxy.newProxyInstance;
import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
import static org.mybatis.spring.SqlSessionUtils.getSqlSession;
import static org.mybatis.spring.SqlSessionUtils.isSqlSessionTransactional;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.MyBatisExceptionTranslator;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.util.Assert;


/**
 * @Author:YJG86166
 * @Date:2020/7/30 13:46
 */
public class CustomSqlSessionTemplate extends SqlSessionTemplate {
    private final SqlSessionFactory sqlSessionFactory;
    private final ExecutorType executorType;
    private final SqlSession sqlSessionProxy;
    private final PersistenceExceptionTranslator exceptionTranslator;
    private Map<Object, SqlSessionFactory> targetSqlSessionFactories;
    private SqlSessionFactory defaultTargetSqlSessionFactory;

    /**
     * 通过Map传入
     * @param targetSqlSessionFactories
     */
    public void setTargetSqlSessionFactories(Map<Object, SqlSessionFactory> targetSqlSessionFactories) {
        this.targetSqlSessionFactories = targetSqlSessionFactories;
    }
    public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
        this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
    }
    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
    }
    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
        this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
                .getEnvironment().getDataSource(), true));
    }
    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
                                    PersistenceExceptionTranslator exceptionTranslator) {
        super(sqlSessionFactory, executorType, exceptionTranslator);
        this.sqlSessionFactory = sqlSessionFactory;
        this.executorType = executorType;
        this.exceptionTranslator = exceptionTranslator;
        this.sqlSessionProxy = (SqlSession) newProxyInstance(
                SqlSessionFactory.class.getClassLoader(),
                new Class[] { SqlSession.class },
                new SqlSessionInterceptor());
        this.defaultTargetSqlSessionFactory = sqlSessionFactory;
    }
    //通过DataSourceContextHolder获取当前的会话工厂
    @Override
    public SqlSessionFactory getSqlSessionFactory() {
        String dataSourceKey = DataSourceContextHolder.getDataSource();
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        } else {
            Assert.notNull(targetSqlSessionFactories, "Property 'targetSqlSessionFactories' or 'defaultTargetSqlSessionFactory' are required");
            Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactories' are required");
        }
        return this.sqlSessionFactory;
    }


    @Override
    public Configuration getConfiguration() {
        return this.getSqlSessionFactory().getConfiguration();
    }
    public ExecutorType getExecutorType() {
        return this.executorType;
    }
    public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
        return this.exceptionTranslator;
    }
    /**
     * {@inheritDoc}
     */
    public <T> T selectOne(String statement) {
        return this.sqlSessionProxy.<T> selectOne(statement);
    }
    /**
     * {@inheritDoc}
     */
    public <T> T selectOne(String statement, Object parameter) {
        return this.sqlSessionProxy.<T> selectOne(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
        return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey);
    }
    /**
     * {@inheritDoc}
     */
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
        return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
    }
    /**
     * {@inheritDoc}
     */
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
        return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
    }
    /**
     * {@inheritDoc}
     */
    public <E> List<E> selectList(String statement) {
        return this.sqlSessionProxy.<E> selectList(statement);
    }
    /**
     * {@inheritDoc}
     */
    public <E> List<E> selectList(String statement, Object parameter) {
        return this.sqlSessionProxy.<E> selectList(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
        return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
    }
    /**
     * {@inheritDoc}
     */
    public void select(String statement, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, handler);
    }
    /**
     * {@inheritDoc}
     */
    public void select(String statement, Object parameter, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, parameter, handler);
    }
    /**
     * {@inheritDoc}
     */
    public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
    }
    /**
     * {@inheritDoc}
     */
    public int insert(String statement) {
        return this.sqlSessionProxy.insert(statement);
    }
    /**
     * {@inheritDoc}
     */
    public int insert(String statement, Object parameter) {
        return this.sqlSessionProxy.insert(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    public int update(String statement) {
        return this.sqlSessionProxy.update(statement);
    }
    /**
     * {@inheritDoc}
     */
    public int update(String statement, Object parameter) {
        return this.sqlSessionProxy.update(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    public int delete(String statement) {
        return this.sqlSessionProxy.delete(statement);
    }
    /**
     * {@inheritDoc}
     */
    public int delete(String statement, Object parameter) {
        return this.sqlSessionProxy.delete(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    public <T> T getMapper(Class<T> type) {
        return getConfiguration().getMapper(type, this);
    }
    /**
     * {@inheritDoc}
     */
    public void commit() {
        throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    public void commit(boolean force) {
        throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    public void rollback() {
        throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    public void rollback(boolean force) {
        throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    public void close() {
        throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    public void clearCache() {
        this.sqlSessionProxy.clearCache();
    }
    /**
     * {@inheritDoc}
     */
    public Connection getConnection() {
        return this.sqlSessionProxy.getConnection();
    }
    /**
     * {@inheritDoc}
     * @since 1.0.2
     */
    public List<BatchResult> flushStatements() {
        return this.sqlSessionProxy.flushStatements();
    }
    /**
     * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
     * unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
     * the {@code PersistenceExceptionTranslator}.
     */
    private class SqlSessionInterceptor implements InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            final SqlSession sqlSession = getSqlSession(
                    CustomSqlSessionTemplate.this.getSqlSessionFactory(),
                    CustomSqlSessionTemplate.this.executorType,
                    CustomSqlSessionTemplate.this.exceptionTranslator);
            try {
                Object result = method.invoke(sqlSession, args);
                if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) {
                    sqlSession.commit(true);
                }
                return result;
            } catch (Throwable t) {
                Throwable unwrapped = unwrapThrowable(t);
                if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                    Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator
                            .translateExceptionIfPossible((PersistenceException) unwrapped);
                    if (translated != null) {
                        unwrapped = translated;
                    }
                }
                throw unwrapped;
            } finally {
                closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory());
            }
        }
    }


}


xat分布式事务管理器,XATransactionManagerConfig.java:

@Configuration
@EnableTransactionManagement
public class XATransactionManagerConfig {

    @Bean
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }

    @Bean
    public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
                                                         TransactionManager transactionManager) {
        return new JtaTransactionManager(userTransaction, transactionManager);
    }

}


启动类配置上,去除掉自动加载的数据源配置类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class JtadbsourceApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(JtadbsourceApplication.class, args);
    }
}



配置完成!

,使用方法如下:!
/**
 * @Author:yjg86166
 * @Date:2020/5/25 16:25
 */
@Service
@Slf4j
public class DemoApiImpl implements DemoApi {

    @Autowired
    private DemoMapper demoMapper;

    @Override
    //@KYDataSource //默认数据源ONE可以不写
    //@KYDataSource(DataSourceName.ONE) //默认数据源 可以不写
    /**注解里字段的值*/
    @KYDataSource(DataSourceName.ONE) 
    @Transactional(rollbackFor = Exception.class)
    public List<Demo> listResult() {
        //此时在使用dzbl数据源
        List<Demo> demos = demoMapper.listDeptInfo();
        //切换至arraign
        DataSourceContextHolder.setDataSource(DataSourceName.ORACLETWO);
        List<Demo> demos1 = this.listUseArraign();
        //...逻辑代码
        /**如果一下逻辑不需使用ORACLETWO则需要切换回ONE*/
        DataSourceContextHolder.setDataSource(DataSourceName.ONE);
        return demos;
    }

    public List<Demo> listUseArraign(){
        return demoMapper.listDeptInfo();
    }

分享!!!!

以上代码大部分参考于

Springboot 整合druid+mybatis+jta分布式事务+多数据源aop注解动态切换 (一篇到位)

感谢大佬!