rocketmq怎么保证消息一致性_可靠消息最终一致性【本地消息表、RocketMQ 事务消息方案】…

  • Post author:
  • Post category:其他


1 importcom.alibaba.fastjson.JSONObject;2 importorg.apache.rocketmq.spring.annotation.RocketMQTransactionListener;3 importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;4 importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionState;5 importorg.springframework.messaging.Message;6 importorg.springframework.transaction.annotation.Transactional;7

8 /**

9 *@authorAdministrator10 *@version1.011 **/

12 @Component13 @Slf4j14 //生产者组与发送消息时定义组相同

15 @RocketMQTransactionListener(txProducerGroup = “producer_group_txmsg_bank1”)16 public class ProducerTxmsgListener implementsRocketMQLocalTransactionListener {17

18 @Autowired19 AccountInfoService accountInfoService;20

21 @Autowired22 AccountInfoDao accountInfoDao;23

24 //事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调

25 @Override26 @Transactional27 publicRocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {28

29 try{30 //解析message,转成AccountChangeEvent

31 String messageString = new String((byte[]) message.getPayload());32 JSONObject jsonObject =JSONObject.parseObject(messageString);33 String accountChangeString = jsonObject.getString(“accountChange”);34 //将accountChange(json)转成AccountChangeEvent

35 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);36 //执行本地事务,扣减金额

37 accountInfoService.doUpdateAccountBalance(accountChangeEvent);38 //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费

39 returnRocketMQLocalTransactionState.COMMIT;40 } catch(Exception e) {41 e.printStackTrace();42 returnRocketMQLocalTransactionState.ROLLBACK;43 }

46 }47

48 //事务状态回查,查询是否扣减金额

49 @Override50 publicRocketMQLocalTransactionState checkLocalTransaction(Message message) {51 //解析message,转成AccountChangeEvent

52 String messageString = new String((byte[]) message.getPayload());53 JSONObject jsonObject =JSONObject.parseObject(messageString);54 String accountChangeString = jsonObject.getString(“accountChange”);55 //将accountChange(json)转成AccountChangeEvent

56 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);57 //事务id

58 String txNo =accountChangeEvent.getTxNo();59 int existTx =accountInfoDao.isExistTx(txNo);60 if(existTx>0){61 returnRocketMQLocalTransactionState.COMMIT;62 }else{63 returnRocketMQLocalTransactionState.UNKNOWN;64 }65 }66 }



版权声明:本文为weixin_39630048原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。