自己实现分布式事务

  • Post author:
  • Post category:其他


实现例子

我打开了两台虚拟机,分别命令为node1、node2,每台虚拟机上都安装了MySQL数据库,现在向node1上的数据库更新用户账户信息,向node2上的数据库新增用户消费信息。

在node1上创建账户表,建表语句如下:

CREATE TABLE ACCOUNTS

(

ID INT NOT NULL AUTO_INCREMENT COMMENT ‘自增主键’,

CUSTOMER_NO VARCHAR(25) NOT NULL COMMENT ‘客户号’,

CUSTOMER_NAME VARCHAR(25) NOT NULL COMMENT ‘客户名称’,

CARD_ID VARCHAR(18) NOT NULL COMMENT ‘身份证号’,

BANK_ID VARCHAR(25) NOT NULL COMMENT ‘开户行ID’,

BALANCE DECIMAL NOT NULL COMMENT ‘账户余额’,

CURRENCY VARCHAR(10) NOT NULL COMMENT ‘币种’,

PRIMARY KEY (ID)

)

COMMENT = ‘账户表’ ;

然后向表中插入一条记录,如下图:

在这里插入图片描述

在node2上创建用户消费历史表,建表语句如下:

CREATE TABLE USER_PURCHASE_HIS

(

ID INT NOT NULL AUTO_INCREMENT COMMENT ‘自增主键’,

CUSTOMER_NO VARCHAR(25) NOT NULL COMMENT ‘客户号’,

SERIAL_NO VARCHAR(32) NOT NULL COMMENT ‘交易流水号’,

AMOUNT DECIMAL NOT NULL COMMENT ‘交易金额’,

CURRENCY VARCHAR(10) NOT NULL COMMENT ‘币种’,

REMARK VARCHAR(100) NOT NULL COMMENT ‘备注’,

PRIMARY KEY (ID)

)

COMMENT = ‘用户消费历史表’;

复制代码

下面实现一个简陋的例子,代码如下:

1、创建DBUtil类,用来获取和关闭连接

package person.lb.example1;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

public class DBUtil {

static {
    try {
        //加载驱动类
        Class.forName("com.mysql.jdbc.Driver");
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } 
}

//获取node1上的数据库连接
public static Connection getNode1Connection() {
    Connection conn = null;
    try {
        conn = (Connection) DriverManager.getConnection(
                "jdbc:mysql://192.168.0.108:3306/TEST", 
                "root", 
                "root");
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return conn;
}

//获取node2上的数据库连接
public static Connection getNode2Connection() {
    Connection conn = null;
    try {
        conn = (Connection) DriverManager.getConnection(
                "jdbc:mysql://192.168.0.109:3306/TEST", 
                "root", 
                "root");
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return conn;
}

//关闭连接
public static void close(ResultSet rs, Statement st, Connection conn) {
    try {
        if(rs != null) {
            rs.close();
        }
        if(st != null) {
            st.close();
        }
        if(conn != null) {
            conn.close();
        }
    } catch (SQLException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

2、创建XADemo类,用来测试事务

package person.lb.example1;

import java.sql.Connection;

import java.sql.SQLException;

import java.sql.Statement;

public class XADemo {

public static void main(String[] args) {

    //获取连接
    Connection node1Conn = DBUtil.getNode1Connection();
    Connection node2Conn = DBUtil.getNode2Connection();
    try {
        //设置连接为非自动提交
        node1Conn.setAutoCommit(false);
        node2Conn.setAutoCommit(false);
        //更新账户信息
        updateAccountInfo(node1Conn);
        //增加用户消费信息
        addUserPurchaseInfo(node2Conn);
        //提交
        node1Conn.commit();
        node2Conn.commit();
    } catch (SQLException e) {
        e.printStackTrace();
        //回滚
        try {
            node1Conn.rollback();
            node2Conn.rollback();
        } catch (SQLException e1) {
            e1.printStackTrace();
        }
    } finally {
        //关闭连接
        DBUtil.close(null, null, node1Conn);
        DBUtil.close(null, null, node2Conn);
    }
}

/**
 * 更新账户信息
 * @param conn
 * @throws SQLException 
 */
private static void updateAccountInfo(Connection conn) throws SQLException {
    Statement st = conn.createStatement();
    st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");
}

/**
 * 增加用户消费信息
 * @param conn
 * @throws SQLException 
 */
private static void addUserPurchaseInfo(Connection conn) throws SQLException {
    Statement st = conn.createStatement();
    st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
            + " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 100, 'CNY', '买衣服')");
}

}

这是一个没有发生任何异常的例子,执行结果是nod1上ACCOUNTS 表中的BALANCE字段的值成功更新为9900,而node2上USER_PURCHASE_HIS表中新增了一条记录,两个连接上的事务都成功完成,事务目标实现。如果反向测试一下,更改Insert语句,把其中某一个要插入的值改为NULL,由于字段都是非空限制,所以会发生异常,这个连接上的事务会失败,那么跟它关联的node1上的事务也必须回滚,不对数据库进行任何更改。经测试,结果与预期目标一致。说明这个例子是符合事务特性的。

但是这个例子不管是从代码的可读性和可维护性上来说都是比较差的。在使用spring开发项目的时候,配置了事务管理器以后,在我们的业务逻辑中几乎是察觉不到事务控制的,而且也看不到事务控制的代码。那么究竟spring中是怎么实现的事务控制呢,这篇博客中不会详细说明,但是要提到两个东西,事务管理器和资源管理器,现在自己来实现一个简单的事务管理器和资源管理器来对事务进行控制。

代码示例如下:

1、创建AbstractDataSource 类

package person.lb.datasource;

import java.sql.Connection;

import java.sql.SQLException;

public abstract class AbstractDataSource {

//获取连接
public abstract Connection getConnection() throws SQLException ;
//关闭连接
public abstract void close() throws SQLException;

}

2、创建Node1DataSource 类,用来连接node1上的数据库

package person.lb.datasource;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

public class Node1DataSource extends AbstractDataSource {

//使用ThreadLocal类保存当前线程使用的Connection
protected static final ThreadLocal<Connection> threadSession = new ThreadLocal<Connection>();

static {
    try {
        //加载驱动类
        Class.forName("com.mysql.jdbc.Driver");
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } 
}

private final static Node1DataSource node1DataSource = new Node1DataSource();

private Node1DataSource() {}

public static Node1DataSource getInstance() {
    return node1DataSource;
}

/**
 * 获取连接
 */
@Override
public Connection getConnection() throws SQLException {
    Connection conn = null;
    if(threadSession.get() == null) {
        conn = (Connection) DriverManager.getConnection(
                    "jdbc:mysql://192.168.0.108:3306/TEST", 
                    "root", 
                    "root");
        threadSession.set(conn);
    } else {
        conn = threadSession.get();
    }
    return conn;
}

/**
 * 关闭并移除连接
 */
@Override
public void close() throws SQLException {
    Connection conn = threadSession.get();
    if(conn != null) {
        conn.close();
        threadSession.remove();
    }
}

}

3、创建Node2DataSource类,用来连接node2机器上的数据库

package person.lb.datasource;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.SQLException;

public class Node2DataSource extends AbstractDataSource {

//使用ThreadLocal类保存当前线程使用的Connection
protected static final ThreadLocal<Connection> threadSession = new ThreadLocal<Connection>();
    
static {
    try {
        //加载驱动类
        Class.forName("com.mysql.jdbc.Driver");
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } 
}

private static final Node2DataSource node2DataSource  = new Node2DataSource();

private Node2DataSource() {};

public static Node2DataSource getInstance() {
    return node2DataSource;
}

/**
 * 获取连接
 */
@Override
public Connection getConnection() throws SQLException {
    Connection conn = null;
    if(threadSession.get() == null) {
        conn = (Connection) DriverManager.getConnection(
                    "jdbc:mysql://192.168.0.109:3306/TEST", 
                    "root", 
                    "root");
        threadSession.set(conn);
    } else {
        conn = threadSession.get();
    }
    return conn;
}

/**
 * 关闭并移除连接
 */
@Override
public void close() throws SQLException {
    Connection conn = threadSession.get();
    if(conn != null) {
        conn.close();
        threadSession.remove();
    }
}

}

4、创建Node1Dao类,在node1的数据库中更新账户信息

package person.lb.dao;

import java.sql.Connection;

import java.sql.SQLException;

import java.sql.Statement;

import person.lb.datasource.Node1DataSource;

public class Node1Dao {

private Node1DataSource dataSource = Node1DataSource.getInstance();

/**
 * 更新账户信息
 * @throws SQLException
 */
public void updateAccountInfo() throws SQLException {
    Connection conn = dataSource.getConnection();
    Statement st = conn.createStatement();
    st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");
}

}

5、创建Node2Dao,在node2机器上增加用户消费信息

package person.lb.dao;

import java.sql.Connection;

import java.sql.SQLException;

import java.sql.Statement;

import person.lb.datasource.Node2DataSource;

public class Node2Dao {

private Node2DataSource dataSource = Node2DataSource.getInstance();

/**
 * 增加用户消费信息
 * @throws SQLException
 */
public void addUserPurchaseInfo() throws SQLException {
    Connection conn = dataSource.getConnection();
    Statement st = conn.createStatement();
    st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
            + " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', null, 'CNY', '买衣服')");
}

}

6、创建NodeService类,把两个操作作为一个事务来执行

复制代码

package person.lb.service;

import java.sql.SQLException;

import person.lb.dao.Node1Dao;

import person.lb.dao.Node2Dao;

import person.lb.transaction.TransactionManager;

public class NodeService {

public void execute() {
    //启动事务
    TransactionManager.begin();
    
    Node1Dao node1Dao = new Node1Dao();
    Node2Dao node2Dao = new Node2Dao();
    try {
        node1Dao.updateAccountInfo();
        node2Dao.addUserPurchaseInfo();
        //提交事务
        TransactionManager.commit();
    } catch (SQLException e) {
        e.printStackTrace();
    }
}

}

7、最后是测试类TestTx

复制代码

package person.lb.test;

import person.lb.service.NodeService;

public class TestTx {

public static void main(String[] args) {
    NodeService nodeService = new NodeService();
    nodeService.execute();
}

}

经测试,与第一个例子效果一致,但是从代码上来说要比第一个例子的可读性和可维护性高。不过这个例子并不能说明分布式事务中的事务管理器和资源管理器的真正原理,也不是一个可使用的代码,毕竟存在缺陷,而且dao层需要抛出异常才能实现事务的回滚。我想,作为一个理解分布式事务的作用的例子是够了。