一,什么是分布式事务?
分布式事务就是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。以上是百度百科的解释,简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
说起分布式事务,提一下上一篇中集成多数据源中的一个隐患问题,先看一下这段代码。
@Autowired
public User2Dao user2Dao;
@Transactional
public void addUser1AndUser2(String username,int age) {
user2Dao.insert(username, age);
int i=1/0;
user1Dao.insert(username, age);
}
这段代码是一个简单的事务处理,我特意加入了int i=1/0;这句,这一眼就能看出来,这段代码肯定报异常,但是这不是我们关注的问题,我们要关注的是数据是否插入了数据库,先要说明的是user1对应的数据源加了@primary注释,是服务器默认首先访问的,那么问题来了,这段代码到底会不会回滚数据库,答案是否定的,首先,他们是两个不同的数据源,在一个数据源中引用了另一个数据源,但是当前回滚的事务管理是当前数据源的,并管理不到另一个数据源,所以user2数据库中任然会插入数据,user1数据源回滚,那么怎么解决事务的统一管理呢?这就是我们今天的主题。
二,使用atomikos+jta实现分布式事务统一管理
1.要使用atomikos+jta就要引入相关依赖
<!-- automatic+jta的分布式事务管理 -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-jta-atomikos -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
剩下的pom的文件中依赖和上篇中的pom文件依赖相同,这个是额外加入。
这里我们还要自己在资源文件中定义数据源。
下面是application.properties资源文件中的代码。
#automatic+jta
mysql.datasource.test1.url=jdbc:mysql://localhost:3306/test1
mysql.datasource.test1.username=root
mysql.datasource.test1.password=.....
mysql.datasource.test1.minPoolSize=3
mysql.datasource.test1.maxPoolSize=25
mysql.datasource.test1.maxLifetime=20000
mysql.datasource.test1.borrowConnectionTimeout=30
mysql.datasource.test1.loginTimeout=30
mysql.datasource.test1.maintenanceInterval=60
mysql.datasource.test1.maxIdleTime=60
mysql.datasource.test2.url=jdbc:mysql://localhost:3306/test2
mysql.datasource.test2.username=root
mysql.datasource.test2.password=.....
mysql.datasource.test2.minPoolSize=3
mysql.datasource.test2.maxPoolSize=25
mysql.datasource.test2.maxLifetime=20000
mysql.datasource.test2.borrowConnectionTimeout=30
mysql.datasource.test2.loginTimeout=30
mysql.datasource.test2.maintenanceInterval=60
mysql.datasource.test2.maxIdleTime=60
接下来就是配置数据源了,这里使用了atomikos对数据源进行统一的管理,从而统一管理事务。
下面是数据源1的配置代码
import java.sql.SQLException;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import cn.shinelon.config.DBConfig1;
@Configuration
@MapperScan(basePackages="cn.shinelon.test1",sqlSessionFactoryRef="test1SqlSessionFactory")
public class test1MybatisConfig {
//配置数据源
@Primary
@Bean(name="test1Datasource")
public DataSource testDatasource(DBConfig1 config1) throws SQLException {
MysqlXADataSource mysqlXADataSource=new MysqlXADataSource();
mysqlXADataSource.setUrl(config1.getUrl());
mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXADataSource.setPassword(config1.getPassword());
mysqlXADataSource.setUser(config1.getUsername());
mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
AtomikosDataSourceBean atomikosDataSourceBean=new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
atomikosDataSourceBean.setUniqueResourceName("test1Datasource");
atomikosDataSourceBean.setMinPoolSize(config1.getMinPoolSize());
atomikosDataSourceBean.setMaxPoolSize(config1.getMaxPoolSize());
atomikosDataSourceBean.setMaxLifetime(config1.getMaxLifetime());
atomikosDataSourceBean.setBorrowConnectionTimeout(config1.getBorrowConnectionTimeout());
atomikosDataSourceBean.setLoginTimeout(config1.getLoginTimeout());
atomikosDataSourceBean.setMaintenanceInterval(config1.getMaintenanceInterval());
atomikosDataSourceBean.setMaxIdleTime(config1.getMaxIdleTime());
return atomikosDataSourceBean;
}
@Primary
@Bean(name="test1SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1Datasource")DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean=new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
//如果还有分页等其他事务
// bean.setMapperLocations(new PathMatchingResourcePatternResolver().
// getResources("classpath:mybatis/test1/*.xml"));
return bean.getObject();
}
@Primary
@Bean(name="test1SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("test1SqlSessionFactory")
SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
数据源2的配置代码和这个一样,只是把数据源1相关的改为数据源2就可以了,这里就省略了。
接下来我们还需要两个类似实体类的配置文件来映射资源文件中的配置。
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix="mysql.datasource.test1")
public class DBConfig1 {
private String url;
private String username;
private String password;
private int minPoolSize;
private int maxPoolSize;
private int maxLifetime;
private int borrowConnectionTimeout;
private int loginTimeout;
private int maintenanceInterval;
private int maxIdleTime;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getMinPoolSize() {
return minPoolSize;
}
public void setMinPoolSize(int minPoolSize) {
this.minPoolSize = minPoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getMaxLifetime() {
return maxLifetime;
}
public void setMaxLifetime(int maxLifetime) {
this.maxLifetime = maxLifetime;
}
public int getBorrowConnectionTimeout() {
return borrowConnectionTimeout;
}
public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
this.borrowConnectionTimeout = borrowConnectionTimeout;
}
public int getLoginTimeout() {
return loginTimeout;
}
public void setLoginTimeout(int loginTimeout) {
this.loginTimeout = loginTimeout;
}
public int getMaintenanceInterval() {
return maintenanceInterval;
}
public void setMaintenanceInterval(int maintenanceInterval) {
this.maintenanceInterval = maintenanceInterval;
}
public int getMaxIdleTime() {
return maxIdleTime;
}
public void setMaxIdleTime(int maxIdleTime) {
this.maxIdleTime = maxIdleTime;
}
}
同样的数据源2也需要这样的映射配置,代码只是修改数据源即可,这里省略。
其他的配置代码同上一篇集成多数据源中代码,这里我们只讨论开篇的一个问题,如何统一管理事务。
//测试分布式事务的处理
@Autowired
public User2Dao user2Dao;
@Transactional
public void addUser1AndUser2(String username,int age) {
user2Dao.insert(username, age);
int i=1/0;
user1Dao.insert(username, age);
}
这段代码,刚开始我们说数据源2中的事务不会回滚,当发生异常的时候,数据库中依旧会插入数据。但是经过我们上面的一番操作,集成atomikos+jta就轻松解决了多数据源的事务统一管理,感兴趣的同学可以自己测试一下。