demo来这里拿:
项目msf-database-web模块
概述
mybatis-plus作为一款优秀的mybatis插件已经得到了行业的广泛认可和使用,在mybatis-plus提供的众多功能中,有一项功能在分布式系统的构件中非常重要,那就是多数据源的支持。由于分布式系统的设计要求,需要对业务进行拆分,分表分库就自然而然变得十分常见了。但是在分表分库之后就面临一个问题:当同时需要操作的业务表分散在多个数据库中的时候,怎么保证一系列数据操作的原子性,怎么控制不同数据库之间的事务回滚呢?
巧了,优秀的mybatis-plus为了解决这个问题,
其多数据源功能支持阿里的分布式事务解决方案——Seata
!
经过我短暂的研究之后,发现Seata在github提供的mybatis-plus整合Seat的多数据源示例写的不太友好,并不是写的不好(d但是在多数据源的使用上,官方demo用的是代码配置和加载多数据源,功能代码已经侵入了业务代码),我自己就自己完成了一遍,算是Seata的第一次快速上路吧。
代码示例
构建mybatis-plus多数据源访问项目
首先搭建一个简单的项目,引入mybatis-plus的依赖和多数据源的驱动包
:
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
配置多数据源:
server:
port: 2003
spring:
application:
name: database-web
dataSourceName1: client
dataSourceName2: client2
profiles:
active: dev
logging:
config: classpath:logback-spring.xml
# 开启所有/actuator下的端点
management:
endpoints:
web:
exposure:
include: "*"
mybatis-plus:
mapper-locations: classpath*:**/mapper/*/*Mapper.xml
---
spring:
profiles:
active: dev
cloud:
config:
name: ${spring.application.name}
uri: http://localhost:1002/
profile: dev
label: master
discovery:
enabled: true
service-id: config-center
alibaba:
seata:
tx-service-group: my_test_tx_group
redis:
database: 0 # redis连接节点
host: 127.0.0.1
port: 6379
password: liyalong
timeout: 6000ms
lettuce:
pool:
max-idle: 8 # 最大空闲连接数 默认8
max-active: 8 # 最大活跃链接数 默认8
datasource:
#单数据源配置
# driver-class-name: com.mysql.cj.jdbc.Driver
# url: jdbc:mysql://127.0.0.1:3306/msf-${spring.application.dataSourceName}?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
# username: root
# password: lyl512240816
#纯粹多数据源配置
dynamic:
primary: master #设置默认的数据源或者数据源组,默认值即为master
strict: false #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候会抛出异常,不启动则使用默认数据源.
datasource:
master:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/msf-${spring.application.dataSourceName1}?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lyl512240816
db2:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/msf-${spring.application.dataSourceName2}?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lyl512240816
db3:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata_order?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lyl512240816
db4:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata_storage?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lyl512240816
db5:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seata_pay?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: lyl512240816
eureka:
client:
serviceUrl:
defaultZone: http://localhost:1001/eureka/
使用 @DS 切换数据源。
@DS
可以注解在方法上和类上,
同时存在方法注解优先于类上注解
。
强烈建议只注解在service实现上。
注解 | 结果 |
---|---|
没有@DS | 默认数据源 |
@DS(“dsName”) | dsName可以为组名也可以为具体某个库的名称 |
此时就已经构建好了一个支持多数据源访问的项目。
增加Seata分布式事务管理支持
引入Seata依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
配置service-group(
关于事务分组之后会详细介绍
)
spring:
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group
拷贝file.conf和registry.conf到resource目录下
file.conf是Seata的核心配置文件
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
registry.conf是Seata的配置中心和注册中心配置文件(在本示例中并没有用到配置中心和注册中心)
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
nacos {
application = "seata-server"
serverAddr = "localhost"
namespace = ""
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
timeout = "0"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
下载并运行TC(Transaction Coordinator 事务协调器)
在github下载地址:
https://github.com/seata/seata/releases
下载之后在本地解压:
直接运行bat文件就可以启动了,
占用端口8091
编写业务代码
以官方示例中的下单付款、扣除库存的业务场景为例
创建数据库:
# Order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;
CREATE TABLE seata_order.orders
(
id INT(11) NOT NULL AUTO_INCREMENT,
user_id INT(11) DEFAULT NULL,
product_id INT(11) DEFAULT NULL,
pay_amount DECIMAL(10, 0) DEFAULT NULL,
status VARCHAR(100) DEFAULT NULL,
add_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
CREATE TABLE seata_order.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8
;
# Storage
DROP DATABASE IF EXISTS seata_storage;
CREATE DATABASE seata_storage;
CREATE TABLE seata_storage.product
(
id INT(11) NOT NULL AUTO_INCREMENT,
price DOUBLE DEFAULT NULL,
stock INT(11) DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
INSERT INTO seata_storage.product (id, price, stock)
VALUES (1, 5, 10);
CREATE TABLE seata_storage.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
# Pay
DROP DATABASE IF EXISTS seata_pay;
CREATE DATABASE seata_pay;
CREATE TABLE seata_pay.account
(
id INT(11) NOT NULL AUTO_INCREMENT,
balance DOUBLE DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
CREATE TABLE seata_pay.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
INSERT INTO seata_pay.account (id, balance)
VALUES (1, 1);
SELECT auto_increment
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'seata_order'
AND TABLE_NAME = 'undo_log'
Controller:
package com.leolee.msf.controller;
import com.leolee.msf.entity.OperationResponse;
import com.leolee.msf.entity.order.PlaceOrderRequestVO;
import com.leolee.msf.service.serviceInterface.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName OrderController
* @Description: 订单
* @Author LeoLee
* @Date 2020/11/10
* @Version V1.0
**/
@RestController
@RequestMapping("/order")
public class OrderController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private OrderService orderService;
/*
* 功能描述: <br>
* 〈下单〉
* @Param: [placeOrderRequestVO]
* @Return: com.leolee.msf.entity.OperationResponse
* @Author: LeoLee
* @Date: 2020/11/10 21:27
*/
@RequestMapping(value = "/placeOrder")
public OperationResponse placeOrder(@RequestBody PlaceOrderRequestVO placeOrderRequestVO) throws Exception {
logger.info("=======================================================");
logger.info("下单订购请求:userId:[" + placeOrderRequestVO.getUserId() + "],productId[" + placeOrderRequestVO.getProductId() + "],price[" + placeOrderRequestVO.getPrice() + "]");
return orderService.placeOrder(placeOrderRequestVO);
}
}
Service:
package com.leolee.msf.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.leolee.msf.dao.OrderMapper;
import com.leolee.msf.entity.OperationResponse;
import com.leolee.msf.entity.order.Order;
import com.leolee.msf.entity.order.OrderStatus;
import com.leolee.msf.entity.order.PlaceOrderRequestVO;
import com.leolee.msf.service.serviceInterface.OrderService;
import com.leolee.msf.service.serviceInterface.PayService;
import com.leolee.msf.service.serviceInterface.StorageService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @ClassName OrderServiceImpl
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/10
* @Version V1.0
**/
@Service("orderService")
public class OrderServiceImpl implements OrderService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private OrderMapper orderMapper;
@Autowired
private StorageService storageService;
@Autowired
private PayService payService;
@GlobalTransactional
@DS("db3")
@Override
public OperationResponse placeOrder(PlaceOrderRequestVO placeOrderRequestVO) throws Exception {
logger.info("=====================Order start===================");
logger.info("当前 XID: {}", RootContext.getXID());
//每人限购一件嗷
final Integer amount = 1;
final Integer price = placeOrderRequestVO.getPrice();
//创建订单
Order order = Order.builder()
.productId(placeOrderRequestVO.getProductId())
.userId(placeOrderRequestVO.getUserId())
.payAmount(placeOrderRequestVO.getPrice())
.status(OrderStatus.INIT)
.build();
int insertOrderResult = orderMapper.insert(order);
logger.info("保存订单{}", insertOrderResult > 0 ? "succeed" : "failed");
//扣减库存
boolean operationStorageResult = storageService.reduceStock(placeOrderRequestVO.getProductId(), amount);
//扣减用户余额
boolean operationBalanceResult = payService.reduceBalance(placeOrderRequestVO.getUserId(), price);
logger.info("=====================Order end====================");
//更新订单\
order.setStatus(OrderStatus.SUCCESS);
Integer updateOrderRecord = orderMapper.updateById(order);
logger.info("更新订单:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");
return OperationResponse.builder()
.success(operationStorageResult && operationBalanceResult)
.build();
}
}
package com.leolee.msf.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.leolee.msf.entity.pay.Account;
import com.leolee.msf.dao.AccountMapper;
import com.leolee.msf.service.serviceInterface.PayService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
/**
* @ClassName PayServiceImpl
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/10
* @Version V1.0
**/
@Service("payService")
public class PayServiceImpl implements PayService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private AccountMapper accountMapper;
@DS("db5")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@Override
public boolean reduceBalance(Long userId, Integer price) throws Exception {
logger.info("======================Pay start=======================");
logger.info("当前 XID: {}", RootContext.getXID());
//检查余额
checkBalance(userId, price);
logger.info("开始扣减用户 {} 余额", userId);
Account account = accountMapper.selectById(userId);
account.setBalance(account.getBalance() - price);
Integer record = accountMapper.updateById(account);
logger.info("扣减用户 {} 余额结果:{}", userId, record > 0 ? "操作成功" : "扣减余额失败");
logger.info("======================Pay end=========================");
return record > 0;
}
/*
* 功能描述: <br>
* 〈检查账户余额是否足够支付〉
* @Param: []
* @Return: void
* @Author: LeoLee
* @Date: 2020/11/10 22:40
*/
private void checkBalance(Long userId, Integer price) throws Exception {
logger.info("检查用户{}余额", userId);
Account account = accountMapper.selectById(userId);
Integer balance = account.getBalance();
if (balance < price) {
logger.info("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new Exception("用户{" + userId + "}余额不足");
}
}
}
package com.leolee.msf.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.leolee.msf.dao.StorageMapper;
import com.leolee.msf.entity.storage.Product;
import com.leolee.msf.service.serviceInterface.StorageService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
/**
* @ClassName StorageServiceImpl
* @Description: TODO
* @Author LeoLee
* @Date 2020/11/10
* @Version V1.0
**/
@Service("storageService")
public class StorageServiceImpl implements StorageService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private StorageMapper storageMapper;
@DS("db4")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@Override
public boolean reduceStock(Long productId, Integer amount) throws Exception {
logger.info("=====================Storage start===================");
logger.info("当前 XID: {}", RootContext.getXID());
//检查库存
this.checkSrock(productId, amount);
//扣除库存
logger.info("开始扣除{}库存{}件", productId, amount);
Product product = storageMapper.selectById(productId);
product.setStock(product.getStock() - amount);
Integer record = storageMapper.updateById(product);
logger.info("扣减 {} 库存结果:{}", productId, record > 0 ? "操作成功" : "扣减库存失败");
logger.info("=====================Storage end=====================");
return record > 0;
}
/*
* 功能描述: <br>
* 〈检查库存是否足够〉
* @Param: [productId, requiredAmount]
* @Return: void
* @Author: LeoLee
* @Date: 2020/11/10 22:26
*/
private void checkSrock(Long productId, Integer requiredAmount) throws Exception {
logger.info("检查{}库存", productId);
Product product = storageMapper.selectById(productId);
if (product.getStock() < requiredAmount) {
logger.info("商品{}库存不足,当前库存{}", productId, requiredAmount);
throw new Exception("商品{" + productId + "}库存");
}
}
}
Demo演示
初始化账户上只有1块钱
请求下单接口开始花钱:
执行成功,花了一块钱,现在账户上没钱了,再次请求要触发回滚机制:
直接抛出异常,Seata检测到异常直接开始全局事务的回滚