SEATA

  • Post author:
  • Post category:其他




SEATA集成记录



背景

在分布式系统中,每个节点都可以知晓自己操作的成功或者失败,但无法知道其他节点操作的成功或失败。
当一个事务跨多个节点时,为保持事务的原子性与一致性,而引入一个协调者来统一掌控所有参与者的操作结果。
并指示他们是否要把操作结果进行真正的提交或者回滚

这次集成的项目环境为spring-cloud+mybatis-plus-Oracle+eureka;



常见分布式协议(2PC和3PC)



2PC(二阶段提交)

2PC是常用的分布式事务解决方案,即将事务的提交过程分为两个阶段来进行处理-----准备阶段,提交阶段

  • 第一阶段(投票阶段):


    1. 协调者向所有参与者发送事务内容,询问是否可以提交事务,并等待答复

    2. 各参与者执行事务操作,将 undo 和 redo 信息记入事务日志中(但不提交事务)

    3. 如参与者执行成功,给协调者反馈同意,否则反馈中止

    第一阶段


  • 第二阶段(提交执行阶段):



    A

    . 当协调者节点从所有参与者节点获得的相应消息都为同意时

     协调者节点向所有参与者节点发出正式提交(commit)的请求。
     参与者节点正式完成操作,并释放在整个事务期间内占用的资源。
     参与者节点向协调者节点发送ack完成消息。
     协调者节点收到所有参与者节点反馈的ack完成消息后,完成事务。
    

第二阶段提交


B

:如果任一参与者节点在第一阶段返回的响应消息为中止,或者协调者节点在第一阶段的询问超时之前无法获取所有参与者节点的响应消息时

协调者节点向所有参与者节点发出回滚操作(rollback)的请求。
参与者节点利用阶段1写入的undo信息执行回滚,并释放在整个事务期间内占用的资源。
参与者节点向协调者节点发送ack回滚完成消息。
协调者节点受到所有参与者节点反馈的ack回滚完成消息后,取消事务。

第二阶段回滚

不管最后结果如何,第二阶段都会结束当前事务。



二阶段提交的优缺点:

优点:尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域。(其实也不能100%保证强一致)
缺点:实现复杂,牺牲了可用性,对性能影响较大,不适合高并发高性能场景。


主要问题



1. 性能问题:

执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。

2. 可靠性问题:

参与者发生故障。协调者需要给每个参与者额外指定超时机制,超时后整个事务失败。协调者发生故障。参与者会一直阻塞下去。需要额外的备机进行容错。

3. 数据一致性问题:

二阶段无法解决的问题:协调者在发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。



3PC(三阶段提交)

3PC三阶段提交协议,是二阶段提交协议的改进版本,三阶段提交有两个改动点:
	1. 在协调者和参与者中都引入超时机制
	2. 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的

也就是说,除了引入超时机制之外,3PC把2PC的准备阶段再次一分为二

这样三阶段提交就有:

CanCommit,PreCommit,DoCommit三个阶段

在这里插入图片描述


一阶段:CanCommit阶段

3PC的CanCommit阶段其实和2PC的准备阶段很像。
协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。
  1. 事务询问 协调者向所有参与者发出包含事务内容的CanCommit请求,询问是否可以提交事务,并等待所有参与者答复。
  2. 响应反馈 参与者收到CanCommit请求后,如果认为可以执行事务操作,则反馈 yes 并进入预备状态,否则反馈 no。


二阶段:PreCommit阶段

协调者根据参与者的反应情况来决定是否可以进行事务的PreCommit操作。根据响应情况,有以下两种可能。


A

:假如所有参与者均反馈 yes,协调者预执行事务。

  1. 发送预提交请求 :协调者向参与者发送PreCommit请求,并进入准备阶段
  2. 事务预提交 :参与者接收到PreCommit请求后,会执行事务操作,并将undo和redo信息记录到事务日志中(但不提交事务)
  3. 响应反馈 :如果参与者成功的执行了事务操作,则返回ACK响应,同时开始等待最终指令。

    在这里插入图片描述


B

:假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。

  1. 发送中断请求 :协调者向所有参与者发送abort请求
  2. 中断事务 :参与者收到来自协调者的abort请求之后(或超时之后,仍未收到协调者的请求),执行事务的中断。

    在这里插入图片描述


三阶段:DoCommit阶段

进入阶段3后无论协调者出现问题,或者协调者与参与者网络出现问题,都会导致参与者无法
接收到协调者发出的doCommit或abort请求。此时,参与者都会在等待超时之后,继续执行事务提交。
一句话概括就是:
当进入第三阶段时,由于网络超时等原因,虽然参与者没有收到commit或者abort响应,
但是他有理由相信:成功提交的几率很大。

该阶段进行真正的事务提交,也可以分为以下两种情况。


A

:所有参与者均反馈 ack 响应,执行真正的事务提交

  1. 发送提交请求

    协调接收到参与者发送的ACK响应,那么他将从预提交状态进入到提交状态。并向所有参与者发送doCommit请求
  2. 事务提交

    参与者接收到doCommit请求之后,执行正式的事务提交。并在完成事务提交之后释放所有事务资源。
  3. 响应反馈

    事务提交完之后,向协调者发送ack响应。
  4. 完成事务

    协调者接收到所有参与者的ack响应之后,完成事务。

    在这里插入图片描述


B

:任何一个参与者反馈 no,或者等待超时后协调者尚无法收到所有参与者的反馈,即中断事务

  1. 发送中断请求

    如果协调者处于工作状态,向所有参与者发出 abort 请求
  2. 事务回滚

    参与者接收到abort请求之后,利用其在阶段二记录的undo信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
  3. 反馈结果

    参与者完成事务回滚之后,向协调者反馈ACK消息
  4. 中断事务

    协调者接收到参与者反馈的ACK消息之后,执行事务的中断。

    在这里插入图片描述



三阶段提交的优缺点

优点:

相比二阶段提交,三阶段提交降低了阻塞范围,在等待超时后协调者或参与者会中断事务。
避免了协调者单点问题,阶段 3 中协调者出现问题时,参与者会继续提交事务。

缺点:

数据不一致问题依然存在,当在参与者收到preCommit请求后等待doCommit指令时,
此时如果协调者请求中断事务,而协调者无法与参与者正常通信,会导致参与者继续提交事务,造成数据不一致。

.

.

.



开始集成



项目环境

spring-cloud+mybatis-plus-Oracle+eureka



spring集成seata



seata相关术语

TC(事务协调者):维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM(事务管理器):定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM(资源管理器):管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态
UNDO_LOG表: 回滚日志
			a.undo_log表必须在每个业务数据库中创建,用于保存回滚操作数据
			b.当全局提交时,undo_log记录直接删除
			c.当全局回滚时,将现有数据撤销,还原至操作前的状态



AT模式介绍


特点

:对业务无入侵式,整体机制分二阶段提交


两阶段提交协议的演变:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
	提交异步化,非常快速地完成。
	回滚通过一阶段的回滚日志进行反向补偿。


具体实现步骤:

TM端使用@GlobalTransaction进行全局事务开启、提交、回滚
TM开始RPC调用远程服务
RM端 seata-client 通过扩展DataSourceProxy,实现自动生成UNDO_LOG 与 TC上报
TM告知TC提交/回滚全局事务
TC通知RM各自执行commit/rollback操作,同时清除undo_log


AT模式中,seata是通过数据源代理类 DataSourceProxy对数据库进行操作。


业务通过‘JDBC标准接口’访问数据库资源时,数据源代理会拦截所有的请求,除了执行原请求,还会做一些分布式事务相关的工作,包括生成前、后镜像,加锁数据,保存事务日志。


seata中,每个参与AT模式事务的数据库被看作一个资源。


每个本地事务提交前,seata的资源管理器RM都会向事务协调器TC中注册一个分支事务。


每个分支事务对应于插入的一行事务日志。


分支事务注册成功后,会提交本地事务。在本地事务提交后,资源管理器RM向事务协调器汇报分支事务状态

**;**


全局事务中所有操作都完成后,事务管理器根据【执行时是否捕获异常】来决定提交全局事务还是回滚全局事务,通知TC提交或回滚分支事务,进入二阶段处理;


TC找出该分布式事务的所有分支事务,向每个分支事务所对应的资源管理器发起二阶段提交或回滚操作,RM根据分支事务ID,从事务日志表找到对应的事务日志,基于日志完成二阶段处理;


前镜像(beforeImage)和后镜像(afterImage)都会保存在undo_log表的rollback_info字段中,前者记录修改之前的数据,后者记录“写”操作之后的数据,如果发生回滚,则根据前、后镜像构建回滚语句,恢复到事务进行前的状态;



1.搭建SEATA TC协调者

  1. 下载seata1.4.2版本zip并解压缩(

    https://seata.io/zh-cn/blog/download.html



    在这里插入图片描述

  2. 修改registry.conf文件(conf目录下)

    修改registry.type为 eureka
    配置registry.eureka中的信息
    修改config.type为 file
    配置config.file中的信息
    

在这里插入图片描述

在这里插入图片描述

3. 修改file.conf文件(conf目录下)

修改store.mode 为 db
配置db连接信息

在这里插入图片描述

4. 修改logback.xml(conf目录下)

配置LOG_HOME和相应的appender(可复制直接替换,注意LOG_HOME位置配置为自己要存放日志的位置)
<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~  Copyright 1999-2019 Seata.io Group.
  ~
  ~  Licensed under the Apache License, Version 2.0 (the "License");
  ~  you may not use this file except in compliance with the License.
  ~  You may obtain a copy of the License at
  ~
  ~       http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~  Unless required by applicable law or agreed to in writing, software
  ~  distributed under the License is distributed on an "AS IS" BASIS,
  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~  See the License for the specific language governing permissions and
  ~  limitations under the License.
  -->

<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <!-- Context listeners -->
    <contextListener class="io.seata.server.logging.listener.SystemPropertyLoggerContextListener"/>

    <!-- Copied from spring-boot.jar -->
    <conversionRule conversionWord="clr" converterClass="io.seata.server.logging.logback.ColorConverter"/>
    <conversionRule conversionWord="wex" converterClass="io.seata.server.logging.logback.WhitespaceThrowableProxyConverter"/>
    <conversionRule conversionWord="wEx" converterClass="io.seata.server.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
	<property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx"/>
    <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p --- [%t] %-40.40logger{39} : %m%n%wEx"/>
    <!-- common properties -->
    <property name="APPLICATION_NAME" value="seata-server"/>
	<property name="LOG_HOME" value="E:\SEATA\1.4.2\seata\seata-server-1.4.2\logs"/>


    <!-- console-appender -->
    <!-- <include resource="logback/console-appender.xml"/> -->
	<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- file-appender -->
    <!-- <include resource="logback/file-appender.xml"/> -->
	<!--ALL-->
    <appender name="ALL" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/txc.${PORT}.all.log</file>
        <append>true</append>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/history/txc.${PORT}.all.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
            <maxFileSize>2GB</maxFileSize>
            <MaxHistory>7</MaxHistory>
            <totalSizeCap>7GB</totalSizeCap>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
        <encoder>
            <Pattern>${FILE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- logstash-appender: off by default -->
    <!--<include resource="logback/logstash-appender.xml"/>-->

    <!-- kafka-appender: off by default -->
    <!--<include resource="logback/kafka-appender.xml"/>-->
	
	<!--WARN-->
    <appender name="WARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>WARN</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <file>${LOG_HOME}/txc.${PORT}.warn.log</file>
        <append>true</append>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/history/txc.${PORT}.warn.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
            <maxFileSize>2GB</maxFileSize>
            <MaxHistory>7</MaxHistory>
            <totalSizeCap>7GB</totalSizeCap>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
        <encoder>
            <Pattern>${FILE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!--ERROR-->
    <appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <file>${LOG_HOME}/txc.${PORT}.error.log</file>
        <append>true</append>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/history/txc.${PORT}.error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
            <maxFileSize>2GB</maxFileSize>
            <MaxHistory>7</MaxHistory>
            <totalSizeCap>7GB</totalSizeCap>
            <cleanHistoryOnStart>true</cleanHistoryOnStart>
        </rollingPolicy>
        <encoder>
            <Pattern>${FILE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <!-- console-appender -->
        <!-- file-appender -->
        <appender-ref ref="ALL"/>
        <appender-ref ref="WARN"/>
        <appender-ref ref="ERROR"/>
		<appender-ref ref="CONSOLE"/>

        <!-- logstash-appender: off by default -->
        <!--<appender-ref ref="LOGSTASH"/>-->

        <!-- kafka-appender: off by default -->
        <!--<appender-ref ref="KAFKA"/>-->
    </root>
</configuration>

5.添加数据库驱动jar包

由于我们使用的是Oracle数据库,seata自带的lib下没有对应的Oracle数据库驱动,所以需要手动添加相应的ojdbcX.jar,
已测试过ojdbc5,ojdbc6,ojdbc7,ojdbc8均可正常启动,建议使用ojdbc8
直接将对应的jar包放至lib文件夹下即可
  1. 启动seata TC server

     进入seata-bin目录下会看到两个启动脚本,分别为.bat和.sh脚本,对应windows启动和Linux下启动
     Windows系统可以直接双击seata-server.bat启动,也可在cmd窗口中输入seata-server.bat 启动(同时可以指定端口,若未指定,则按照默认端口8091启动)
     Linux系统中,则可以使用命令 nohup ./seata-server.sh >log.out 2>1 &  启动
    

成功启动后,可以看到,seata TC server已成功注册到eureka

在这里插入图片描述



2. 服务集成seata



添加pom依赖,导入相应jar包

		<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>2021.1</version>
            <!-- 自带版本较低,需排除后重新手动引入seata-spring-boot-starter -->
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
            <version>2.2.0.M1</version>
        </dependency>
        <dependency>
            <groupId>io.github.openfeign</groupId>
            <artifactId>feign-okhttp</artifactId>
            <version>10.2.3</version>
        </dependency>



application.yml文件中添加相应配置

seata:
  enabled: true
  application-id: ${spring.application.name}
  #事务分组配置(在v1.5之后默认值为default_tx_group)
  #此处事务分组名称需要和TC端配置的事务分组名称一致,否则会导致服务无法注册到TC端
  tx-service-group: my_tx_group



添加配置文件到src下

在这里插入图片描述

	file.conf
service{
    vgroupMapping.my_tx_group = "default"
    default.grouplist = "127.0.0.1:8091"
    server.recovery.timeoutRetryPeriod = "5000"
}

store{
    mode = "db"

    db {
        ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
        datasource = "druid"
        ## mysql/oracle/postgresql/h2/oceanbase etc.
        dbType = "oracle"
        driverClassName = "oracle.jdbc.driver.OracleDriver"
        url = "jdbc:oracle:thin:@xxx.xxx.x.xxx:1521/xxxxx"
        user = "xxxxxx"
        password = "xxxxxxxxxx"
        minConn = 5
        maxConn = 30
        globalTable = "GLOBAL_TABLE"
        branchTable = "BRANCH_TABLE"
        lockTable = "LOCK_TABLE"
        queryLimit = 100
        maxWait = 5000
    }
}
	registy.conf
registry {
    type = "eureka"

    eureka {
        serviceUrl = "http://127.0.0.1:8761/eureka"
        application = "seata-server"
        weight = "1"
      }
}

config{
    type = "file"

    file {
        name = "file.conf"
      }
}

至此,服务集成seata已完成,可以看到服务启动后会显示已注册至seata

在这里插入图片描述



3.如何使用

	首先,原来以公共包形式引入的service接口可以不再使用,改为feign的形式进行远程调用;
	其次,在需要进行分布式事务控制的实现层的方法上使用@GlobalTransactional注解;
	需要注意的是:spring的事务控制注解@Transactional只能用在和@GlobalTransactional同一级别或@GlobalTransactional的子级,即全局事务控制应处于最外层
	如果被调用的地方对异常进行了捕获,需要调用该服务或feign方法的业务中能获取到下游RM执行失败,并调用api进行手动全局回滚
	API:GlobalTransactionContext.reload(RootContext.getXID()).rollback();

在这里插入图片描述

	对下游服务捕获了异常的处理,可以这样操作
			String flag = this.flowableStartCommit(flowableModelVo);
            if(!flag.equals("success")){
                GlobalTransactionContext.reload(RootContext.getXID()).rollback();
                return ResultVO.showErrorMessage(500,"操作失败",null);
            }



注意:

全局事务中所有操作都完成后,事务管理器根据【执行时是否捕获异常】来决定提交全局事务还是回滚全局事务。

所以尽量不要在service层捕获异常,可直接抛出,在controller层进行捕获,返回给前端相应的错误代码



版权声明:本文为weixin_45682377原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。