RocketMQ事务消费整理,使用反射和函数式编程进行本地事务的动态监听

  • Post author:
  • Post category:其他



目录


摘要:


1、RocketMQ事务消费应用流程


2、事务消费实现


优化方向:1、反射实现本地事务方法的动态监听,2、函数式编程实现监听方法传递


1、反射实现本地事务方法的动态监听


2、使用函数式编程的方式实现业务方法的传递


摘要:

整理RocketMQ事务消费应用流程,并使用反射和函数式编程技术进行优化,实现对本地事务方法的动态监听

1、RocketMQ事务消费应用流程

需求场景:

一个下定请求,由商品服务A完成商品锁定,定单服务B完成定单创建

一致性问题:

1、在锁定商品提交事务前发送消息,则商品锁定失败的情况下会创建定单

2、如果锁定商品成功提交事务后再发送消息,若发送消息异常则不会创建定单

3、引入事务消费

RocketMQ事务消费解决流程图

半消息:RocketMQ 发送的事务消费消息,消息发送到broker但不允许消费者消费,需要等待发送方提交commit状态后,半消息才能正常被消费

2、事务消费实现

商品服务A:下定业务触发方法

public void goodSubmit(){
    //发送下定半消息,数据仅测试使用
    long transactionId=System.currentTimeMillis();
    long orderId=123L;
    producerSvc.sendMessageInTransaction(
            "order-topic",
            MessageBuilder.withPayload(orderId).setHeader("TRANSACTION_ID",transactionId).build(),
            null
    );
    //发送完半消息后在本地事务监听器里面执行商品锁定方法goodLock()
}

商品服务A:需要进行事务监听的商品锁定方法

@Service
public class TransactionSvc {
    @Autowired
    RocketMQTemplate producerSvc;
    @Autowired
    TransactionLogDao logDao;

    @Transactional(isolation = Isolation.READ_COMMITTED,propagation = Propagation.REQUIRED)
    public boolean goodLock(long transactionId){
        //执行发送半消息后需要的业务逻辑
        System.out.println("模拟锁定商品,记录事务id");
        TransactionLog log=new TransactionLog();
        log.setId(transactionId);
        logDao.insert(log);
        //在同一事务里面,当商品锁定时,必然会记录事务id在日志表
        return true;
    }
}

RocketMQ监听事务实现

@RocketMQTransactionListener
public class MyLocalTransactionListener implements RocketMQLocalTransactionListener {
    @Autowired
    TransactionLogDao logDao;
    @Autowired
    TransactionSvc transactionSvc;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message var1, Object var2){
        //执行本地事务
        try{
            MessageHeaders headers=var1.getHeaders();
            long transactionId=Long.parseLong(headers.get("TRANSACTION_ID").toString());
            //在监听下执行商品锁定方法
            transactionSvc.goodLock(transactionId);
            return RocketMQLocalTransactionState.COMMIT;
            //执行成功,提交事务,MQ接收到commit消息后会push半消息给消费者消费,定单服务消费并生成定单
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    //执行情况回查,如果因为其他异常原因rocketMQ没有收到commit消息会定期回查
    //逻辑自定义
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message var1){
        MessageHeaders headers=var1.getHeaders();
        long transactionId=(long)headers.get("TRANSACTION_ID");
        TransactionLog log=logDao.selectById(transactionId);
        if(null!=log){
            //如果存在日志,则事务执行成功
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            //如果不存在日志,则事务执行失败
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

思考:

 try{
            //在监听下执行商品锁定方法
            transactionSvc.goodLock(transactionId);
            return RocketMQLocalTransactionState.COMMIT;
            //执行成功,提交事务,MQ接收到commit消息后会push半消息给消费者消费,定单服务消费并生成定单
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }

查看上面代码片段,在上述的实现方式中,MyLocalTransactionListener类里面直接调用需要执行的业务事务方法,当出现别的事务消费业务时,比如要监听carLock(),车位锁定等,就不兼容了,因为只能定义一个RocketMQLocalTransactionListener的实现类

优化方向:1、反射实现本地事务方法的动态监听,2、函数式编程实现监听方法传递

1、反射实现本地事务方法的动态监听

商品服务A:修改业务发起方法

public void goodSubmitRefect(){
    //发送下定半消息,数据仅测试使用
    long transactionId=System.currentTimeMillis();
    long orderId=123L;
        //封装需要MQ监听的本地事务方法的信息
    Map params=new HashMap(3);
    Class c=TransactionSvc.class;
    Object[] args=new Object[1];
    args[0]=transactionId;
    Class[] argsClass=new Class[1];
    argsClass[0]=long.class;
    params.put("class",c);
    params.put("args",args);
    params.put("argsClass",argsClass);
    params.put("methodName","goodLock");
    producerSvc.sendMessageInTransaction(
            "order-topic",
            MessageBuilder.withPayload(orderId).setHeader("TRANSACTION_ID",transactionId).build(),
            params
    );
    //发送完半消息后在本地事务监听器里面执行商品锁定方法goodLock()!
}

商品服务A:本地事务方法监听类

@RocketMQTransactionListener
public class MyLocalTransactionListener implements RocketMQLocalTransactionListener {
    @Autowired
    TransactionLogDao logDao;
    @Autowired
    TransactionSvc transactionSvc;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message var1, Object var2){
        //执行本地事务
        try{
            Map params=(Map)var2;
            Class c=(Class) params.get("class");
            Class[] argsClass=(Class[])params.get("argsClass");
            Object[] args=(Object[])params.get("args");
            String methodName=params.get("methodName").toString();
            Method method=c.getMethod(methodName,argsClass);
            method.invoke(c.newInstance(),args);
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message var1){
        MessageHeaders headers=var1.getHeaders();
        long transactionId=(long)headers.get("TRANSACTION_ID");
        TransactionLog log=logDao.selectById(transactionId);
        if(null!=log){
            //如果存在日志,则事务执行成功
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            //如果不存在日志,则事务执行失败
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

结论:传递需要被监听的业务方法的信息,使用反射技术进行方法调用,从而实现本地事务方法的动态监听,监听类与业务方法解耦

2、使用函数式编程的方式实现业务方法的传递

创建函数式接口

@FunctionalInterface
public interface LocalListenerFunction {
    boolean execute();
}

商品服务A:修改业务发起方法

public void goodSubmitFunction(){
    //发送下定半消息,数据仅测试使用
    long transactionId=System.currentTimeMillis();
    long orderId=123L;
    //这里注意一下,因为我的goodLock和goodSubmitFunction是在一个类里面,所以通过成员方法调用就可以
    LocalListenerFunction function=()->goodLock(transactionId);
    producerSvc.sendMessageInTransaction(
            "order-topic",
            MessageBuilder.withPayload(orderId).setHeader("TRANSACTION_ID",transactionId).build(),
            function
    );
    //发送完半消息后在本地事务监听器里面执行商品锁定方法goodLock()
}

商品服务A:本地事务方法监听类

@RocketMQTransactionListener
public class MyLocalTransactionListener implements RocketMQLocalTransactionListener {
    @Autowired
    TransactionLogDao logDao;
    @Autowired
    TransactionSvc transactionSvc;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message var1, Object var2){
        //执行本地事务
        try{
            LocalListenerFunction function=(LocalListenerFunction)var2;
            function.execute();
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message var1){
        MessageHeaders headers=var1.getHeaders();
        long transactionId=(long)headers.get("TRANSACTION_ID");
        TransactionLog log=logDao.selectById(transactionId);
        if(null!=log){
            //如果存在日志,则事务执行成功
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            //如果不存在日志,则事务执行失败
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

结论:通过函数的传递,也可实现监听类和业务方法的解耦

LocalListenerFunction接口不采用函数式实现也可以

public void goodSubmitImplment(){
    //发送下定半消息,数据仅测试使用
    long transactionId=System.currentTimeMillis();
    long orderId=123L;
    producerSvc.sendMessageInTransaction(
            "order-topic",
            MessageBuilder.withPayload(orderId).setHeader("TRANSACTION_ID", transactionId).build(),
            new LocalListenerFunction() {
                @Override
                public boolean execute() {
                    return goodLock(transactionId);
                }
            }
    );
    //发送完半消息后在本地事务监听器里面执行商品锁定方法goodLock()
}

学习内容整理记录,如有错误感谢指正,互相交流,共同进步



版权声明:本文为qq_41066277原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。