什么是事务性消息(transactional message)?
可以认为是一个确保分布式系统中的最终一致性的两阶段提交(two-phase commit )的消息实现。
事务性消息确保本地事务的执行和消息的发送是一个原子性操作。
关于two-phase commit:https://blog.csdn.net/lengxiao1993/article/details/88290514
rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交。
生产者,第一个阶段确保消息发送成功了;
第二个阶段执行本地操作,根据本地操作的执行结果来决定发送的那条消息是否需要消费或回滚。
使用限制
- 事务性消息不支持定时和批处理
- 为了避免单个消息被check多次从而引起大部分队列消息的堆积,我们限制单个消息的check次数默认为15次,用户可以通过配置
transactionCheckMax
参数更改。如果一个消息checked超过了“transactionCheckMax”,这个消息就会被丢弃,同时默认打印一条error log。用户可以通过重写AbstractTransactionCheckListener
来自定义处理方式。 - 根据配置文件中的
transactionTimeout
设置,事务性消息经过“transactionTimeout”时间后会被checked。用户可以在发送事务性消息时设置CHECK_IMMUNITY_TIME_IN_SECONDS
属性,该属性的优先级高于transactionMsgTimeout
。 - 一个事务性消息可能会checked或consumed多次
- 已经commited的消息再次发送到用户的目标topic可能会失败。目前依靠日志记录(log record)。RocketMQ本身的高可用机制确保了高可用性。如果想确保事务性消息没有丢失并且保证了事务的完整性,可以使用同步双写机制(synchronous double write. mechanism)。
- 事务性消息的Producer IDs不能与其他类型消息的Producer IDs共享。与其他类型的消息不同,事务性消息允许反向查询,通过Producer ID查询client。
Application
事务的状态
事务性消息有3种状态
TransactionStatus.CommitTransaction
:提交事务,即允许消费者消费这条消息TransactionStatus.RollbackTransaction
:回滚事务,该消息会被删除,不允许消费TransactionStatus.Unknown
:中间状态,MQ需要重新check来确定消息的状态
发送事务性消息
1 、创建事务producer
使用TransactionMQProducer
类创建producer client,指定一个唯一的producerGroup,可以自定义线程池来处理check请求,本地事务执行完之后,根据执行结构来回复MQ,即确定该条消息的状态(上面3种状态)。
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
2、实现TransactionListener
接口
-
when send half message succeed,使用
executeLocalTransaction
来执行本地事务,返回三种状态之一。 -
方法
checkLocalTransaction
用来check本地事务状态,并且响应MQ的check请求。它也返回三种状态之一。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
原文:http://rocketmq.apache.org/docs/transaction-example/
参考:https://www.cnblogs.com/xuwc/p/9034029.html
版权声明:本文为alex_hh原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。