1 importcom.alibaba.fastjson.JSONObject;2 importorg.apache.rocketmq.spring.annotation.RocketMQTransactionListener;3 importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;4 importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionState;5 importorg.springframework.messaging.Message;6 importorg.springframework.transaction.annotation.Transactional;7
8 /**
9 *@authorAdministrator10 *@version1.011 **/
12 @Component13 @Slf4j14 //生产者组与发送消息时定义组相同
15 @RocketMQTransactionListener(txProducerGroup = “producer_group_txmsg_bank1”)16 public class ProducerTxmsgListener implementsRocketMQLocalTransactionListener {17
18 @Autowired19 AccountInfoService accountInfoService;20
21 @Autowired22 AccountInfoDao accountInfoDao;23
24 //事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
25 @Override26 @Transactional27 publicRocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {28
29 try{30 //解析message,转成AccountChangeEvent
31 String messageString = new String((byte[]) message.getPayload());32 JSONObject jsonObject =JSONObject.parseObject(messageString);33 String accountChangeString = jsonObject.getString(“accountChange”);34 //将accountChange(json)转成AccountChangeEvent
35 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);36 //执行本地事务,扣减金额
37 accountInfoService.doUpdateAccountBalance(accountChangeEvent);38 //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
39 returnRocketMQLocalTransactionState.COMMIT;40 } catch(Exception e) {41 e.printStackTrace();42 returnRocketMQLocalTransactionState.ROLLBACK;43 }
46 }47
48 //事务状态回查,查询是否扣减金额
49 @Override50 publicRocketMQLocalTransactionState checkLocalTransaction(Message message) {51 //解析message,转成AccountChangeEvent
52 String messageString = new String((byte[]) message.getPayload());53 JSONObject jsonObject =JSONObject.parseObject(messageString);54 String accountChangeString = jsonObject.getString(“accountChange”);55 //将accountChange(json)转成AccountChangeEvent
56 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);57 //事务id
58 String txNo =accountChangeEvent.getTxNo();59 int existTx =accountInfoDao.isExistTx(txNo);60 if(existTx>0){61 returnRocketMQLocalTransactionState.COMMIT;62 }else{63 returnRocketMQLocalTransactionState.UNKNOWN;64 }65 }66 }