分布式事务解决方案Seata AT模式——对mybatis-plus多数据源的事务管理支持

  • Post author:
  • Post category:其他


demo来这里拿:

项目msf-database-web模块


概述

mybatis-plus作为一款优秀的mybatis插件已经得到了行业的广泛认可和使用,在mybatis-plus提供的众多功能中,有一项功能在分布式系统的构件中非常重要,那就是多数据源的支持。由于分布式系统的设计要求,需要对业务进行拆分,分表分库就自然而然变得十分常见了。但是在分表分库之后就面临一个问题:当同时需要操作的业务表分散在多个数据库中的时候,怎么保证一系列数据操作的原子性,怎么控制不同数据库之间的事务回滚呢?

巧了,优秀的mybatis-plus为了解决这个问题,

其多数据源功能支持阿里的分布式事务解决方案——Seata

经过我短暂的研究之后,发现Seata在github提供的mybatis-plus整合Seat的多数据源示例写的不太友好,并不是写的不好(d但是在多数据源的使用上,官方demo用的是代码配置和加载多数据源,功能代码已经侵入了业务代码),我自己就自己完成了一遍,算是Seata的第一次快速上路吧。


代码示例


构建mybatis-plus多数据源访问项目



首先搭建一个简单的项目,引入mybatis-plus的依赖和多数据源的驱动包


        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>



配置多数据源:

server:
  port: 2003

spring:
  application:
    name: database-web
    dataSourceName1: client
    dataSourceName2: client2
  profiles:
    active: dev

logging:
  config: classpath:logback-spring.xml

# 开启所有/actuator下的端点
management:
  endpoints:
    web:
      exposure:
        include: "*"

mybatis-plus:
  mapper-locations: classpath*:**/mapper/*/*Mapper.xml

---
spring:
  profiles:
    active: dev
  cloud:
    config:
      name: ${spring.application.name}
      uri: http://localhost:1002/
      profile: dev
      label: master
      discovery:
        enabled: true
        service-id: config-center
    alibaba:
      seata:
        tx-service-group: my_test_tx_group
  redis:
    database: 0 # redis连接节点
    host: 127.0.0.1
    port: 6379
    password: liyalong
    timeout: 6000ms
    lettuce:
      pool:
        max-idle: 8 # 最大空闲连接数 默认8
        max-active: 8 # 最大活跃链接数 默认8
  datasource:
#单数据源配置
#    driver-class-name: com.mysql.cj.jdbc.Driver
#    url: jdbc:mysql://127.0.0.1:3306/msf-${spring.application.dataSourceName}?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
#    username: root
#    password: lyl512240816
#纯粹多数据源配置
    dynamic:
      primary: master #设置默认的数据源或者数据源组,默认值即为master
      strict: false #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候会抛出异常,不启动则使用默认数据源.
      datasource:
        master:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/msf-${spring.application.dataSourceName1}?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lyl512240816
        db2:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/msf-${spring.application.dataSourceName2}?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lyl512240816
        db3:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/seata_order?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lyl512240816
        db4:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/seata_storage?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lyl512240816
        db5:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3306/seata_pay?autoReconnect=true&autoReconnectForPools=true&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lyl512240816

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:1001/eureka/




使用 @DS 切换数据源。


@DS

可以注解在方法上和类上,

同时存在方法注解优先于类上注解

强烈建议只注解在service实现上。

注解 结果
没有@DS 默认数据源
@DS(“dsName”) dsName可以为组名也可以为具体某个库的名称

此时就已经构建好了一个支持多数据源访问的项目。


增加Seata分布式事务管理支持



引入Seata依赖

    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>${seata.version}</version>
        </dependency>
    </dependencies>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2.1.0.RELEASE</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
        </dependencies>
    </dependencyManagement>

配置service-group(


关于事务分组之后会详细介绍


spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: my_test_tx_group

拷贝file.conf和registry.conf到resource目录下

file.conf是Seata的核心配置文件

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

registry.conf是Seata的配置中心和注册中心配置文件(在本示例中并没有用到配置中心和注册中心)

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"

  nacos {
    application = "seata-server"
    serverAddr = "localhost"
    namespace = ""
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    timeout = "0"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}


下载并运行TC(Transaction Coordinator 事务协调器)

在github下载地址:

https://github.com/seata/seata/releases

下载之后在本地解压:

直接运行bat文件就可以启动了,

占用端口8091


编写业务代码

以官方示例中的下单付款、扣除库存的业务场景为例

创建数据库:

# Order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;
CREATE TABLE seata_order.orders
(
    id               INT(11) NOT NULL AUTO_INCREMENT,
    user_id          INT(11)        DEFAULT NULL,
    product_id       INT(11)        DEFAULT NULL,
    pay_amount       DECIMAL(10, 0) DEFAULT NULL,
    status           VARCHAR(100)   DEFAULT NULL,
    add_time         DATETIME       DEFAULT CURRENT_TIMESTAMP,
    last_update_time DATETIME       DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;
CREATE TABLE seata_order.undo_log
(
    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20)   NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11)      NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8
;
# Storage
DROP DATABASE IF EXISTS seata_storage;
CREATE DATABASE seata_storage;
CREATE TABLE seata_storage.product
(
    id               INT(11) NOT NULL AUTO_INCREMENT,
    price            DOUBLE   DEFAULT NULL,
    stock            INT(11)  DEFAULT NULL,
    last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;
INSERT INTO seata_storage.product (id, price, stock)
VALUES (1, 5, 10);
CREATE TABLE seata_storage.undo_log
(
    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20)   NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11)      NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;

# Pay
DROP DATABASE IF EXISTS seata_pay;
CREATE DATABASE seata_pay;
CREATE TABLE seata_pay.account
(
    id               INT(11) NOT NULL AUTO_INCREMENT,
    balance          DOUBLE   DEFAULT NULL,
    last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;
CREATE TABLE seata_pay.undo_log
(
    id            BIGINT(20)   NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20)   NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11)      NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;
INSERT INTO seata_pay.account (id, balance)
VALUES (1, 1);

SELECT auto_increment
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'seata_order'
  AND TABLE_NAME = 'undo_log'

Controller:

package com.leolee.msf.controller;

import com.leolee.msf.entity.OperationResponse;
import com.leolee.msf.entity.order.PlaceOrderRequestVO;
import com.leolee.msf.service.serviceInterface.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName OrderController
 * @Description: 订单
 * @Author LeoLee
 * @Date 2020/11/10
 * @Version V1.0
 **/
@RestController
@RequestMapping("/order")
public class OrderController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());


    @Autowired
    private OrderService orderService;


    /*
     * 功能描述: <br>
     * 〈下单〉
     * @Param: [placeOrderRequestVO]
     * @Return: com.leolee.msf.entity.OperationResponse
     * @Author: LeoLee
     * @Date: 2020/11/10 21:27
     */
    @RequestMapping(value = "/placeOrder")
    public OperationResponse placeOrder(@RequestBody PlaceOrderRequestVO placeOrderRequestVO) throws Exception {

        logger.info("=======================================================");
        logger.info("下单订购请求:userId:[" + placeOrderRequestVO.getUserId() + "],productId[" + placeOrderRequestVO.getProductId() + "],price[" + placeOrderRequestVO.getPrice() + "]");

        return orderService.placeOrder(placeOrderRequestVO);
    }
}

Service:

package com.leolee.msf.service;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.leolee.msf.dao.OrderMapper;
import com.leolee.msf.entity.OperationResponse;
import com.leolee.msf.entity.order.Order;
import com.leolee.msf.entity.order.OrderStatus;
import com.leolee.msf.entity.order.PlaceOrderRequestVO;
import com.leolee.msf.service.serviceInterface.OrderService;
import com.leolee.msf.service.serviceInterface.PayService;
import com.leolee.msf.service.serviceInterface.StorageService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @ClassName OrderServiceImpl
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/10
 * @Version V1.0
 **/
@Service("orderService")
public class OrderServiceImpl implements OrderService {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());


    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private StorageService storageService;

    @Autowired
    private PayService payService;

    @GlobalTransactional
    @DS("db3")
    @Override
    public OperationResponse placeOrder(PlaceOrderRequestVO placeOrderRequestVO) throws Exception {


        logger.info("=====================Order start===================");
        logger.info("当前 XID: {}", RootContext.getXID());

        //每人限购一件嗷
        final Integer amount = 1;
        final Integer price = placeOrderRequestVO.getPrice();

        //创建订单
        Order order = Order.builder()
                .productId(placeOrderRequestVO.getProductId())
                .userId(placeOrderRequestVO.getUserId())
                .payAmount(placeOrderRequestVO.getPrice())
                .status(OrderStatus.INIT)
                .build();

        int insertOrderResult = orderMapper.insert(order);
        logger.info("保存订单{}", insertOrderResult > 0 ? "succeed" : "failed");

        //扣减库存
        boolean operationStorageResult = storageService.reduceStock(placeOrderRequestVO.getProductId(), amount);

        //扣减用户余额
        boolean operationBalanceResult = payService.reduceBalance(placeOrderRequestVO.getUserId(), price);

        logger.info("=====================Order end====================");

        //更新订单\
        order.setStatus(OrderStatus.SUCCESS);
        Integer updateOrderRecord = orderMapper.updateById(order);
        logger.info("更新订单:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");

        return OperationResponse.builder()
                .success(operationStorageResult && operationBalanceResult)
                .build();
    }
}

package com.leolee.msf.service;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.leolee.msf.entity.pay.Account;
import com.leolee.msf.dao.AccountMapper;
import com.leolee.msf.service.serviceInterface.PayService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

/**
 * @ClassName PayServiceImpl
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/10
 * @Version V1.0
 **/
@Service("payService")
public class PayServiceImpl implements PayService {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());


    @Autowired
    private AccountMapper accountMapper;


    @DS("db5")
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    @Override
    public boolean reduceBalance(Long userId, Integer price) throws Exception {

        logger.info("======================Pay start=======================");
        logger.info("当前 XID: {}", RootContext.getXID());

        //检查余额
        checkBalance(userId, price);

        logger.info("开始扣减用户 {} 余额", userId);
        Account account = accountMapper.selectById(userId);
        account.setBalance(account.getBalance() - price);
        Integer record = accountMapper.updateById(account);
        logger.info("扣减用户 {} 余额结果:{}", userId, record > 0 ? "操作成功" : "扣减余额失败");

        logger.info("======================Pay end=========================");
        return record > 0;
    }


    /*
     * 功能描述: <br>
     * 〈检查账户余额是否足够支付〉
     * @Param: []
     * @Return: void
     * @Author: LeoLee
     * @Date: 2020/11/10 22:40
     */
    private void checkBalance(Long userId, Integer price) throws Exception {

        logger.info("检查用户{}余额", userId);

        Account account = accountMapper.selectById(userId);
        Integer balance = account.getBalance();

        if (balance < price) {
            logger.info("用户 {} 余额不足,当前余额:{}", userId, balance);
            throw new Exception("用户{" + userId + "}余额不足");
        }
    }
}

package com.leolee.msf.service;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.leolee.msf.dao.StorageMapper;
import com.leolee.msf.entity.storage.Product;
import com.leolee.msf.service.serviceInterface.StorageService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

/**
 * @ClassName StorageServiceImpl
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/10
 * @Version V1.0
 **/
@Service("storageService")
public class StorageServiceImpl implements StorageService {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());


    @Autowired
    private StorageMapper storageMapper;

    @DS("db4")
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    @Override
    public boolean reduceStock(Long productId, Integer amount) throws Exception {

        logger.info("=====================Storage start===================");
        logger.info("当前 XID: {}", RootContext.getXID());

        //检查库存
        this.checkSrock(productId, amount);

        //扣除库存
        logger.info("开始扣除{}库存{}件", productId, amount);
        Product product = storageMapper.selectById(productId);
        product.setStock(product.getStock() - amount);
        Integer record = storageMapper.updateById(product);
        logger.info("扣减 {} 库存结果:{}", productId, record > 0 ? "操作成功" : "扣减库存失败");

        logger.info("=====================Storage end=====================");
        return record > 0;
    }


    /*
     * 功能描述: <br>
     * 〈检查库存是否足够〉
     * @Param: [productId, requiredAmount]
     * @Return: void
     * @Author: LeoLee
     * @Date: 2020/11/10 22:26
     */
    private void checkSrock(Long productId, Integer requiredAmount) throws Exception {

        logger.info("检查{}库存", productId);

        Product product = storageMapper.selectById(productId);
        if (product.getStock() < requiredAmount) {
            logger.info("商品{}库存不足,当前库存{}", productId, requiredAmount);
            throw new Exception("商品{" + productId + "}库存");
        }
    }
}


Demo演示

初始化账户上只有1块钱

请求下单接口开始花钱:

执行成功,花了一块钱,现在账户上没钱了,再次请求要触发回滚机制:

直接抛出异常,Seata检测到异常直接开始全局事务的回滚



版权声明:本文为qq_25805331原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。