本翻译教程来自RocketMQ
官方网站
,中间会加上自己的理解,有错误或者不妥之处请及时指正。本“有序消息”教程与原官方示例有所不同,但是更加突出了重点。另外,最开始写RocketMQ官方教程的时候,觉得官方示例还可以,有注释,可以辅助理解。到后来官方教程越来越差,所以本文也是在原来的基础上扩充了很多内容以辅助读者理解。
我在示例代码中加了详细的注释,如果读者想要方便的下载这些代码,包括后面所有博客中有关RocketMQ教程中用到的代码,请到
我的GitHub仓库
进行下载。
要保证有序消息,需要从生产者和消费者两个方面进行保证。
首先是生产者。生产者方面通过队列选择器来选择要将消息发送到哪一个消息队列中,因为同一个topic有多个消息队列与之对应(默认是4个)。如果不加以选择,将会随机存放,那么就不能保证同一个orderid的消息被顺序的消费。
package org.apache.rocketmq.example.ordermessage;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
// 实例化DefaultMQProducer,这里如要设置生产者组名。
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
// 指明name server的地址
producer.setNamesrvAddr("192.168.35.128:9876");
// 启动实例
producer.start();
// 发送100个消息,每个消息
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//创建消息实例,指定topic、tag和消息主体,这里将topic省略了,因为它并不是必须的
Message msg = new Message("TopicOrderTest", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
/*
* 调用send方法来发送消息,这里的send方法中第二个参数传递了一个队列选择器,是一个匿名内部类,实现了select方法
* 在select方法中,第二第三个参数Message msg, Object arg就是send方法中的第一和第三个参数的值
* 我们将订单号为0,4,8的消息放到第0个队列中,订单号为1,5,9的消息放到第1个队列中
* 将订单号为2,6的消息放到第2个队列中,订单号为3,7的消息放到第3个队列中
* 由于每一个订单号下对应着10个消息,那么在四个队列中分别存放了30,30,20,20个消息
*/
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id % mqs.size());
}
}, orderId);
// 将发送消息的结果打印下来
System.out.printf("%s%n", sendResult);
}
// 最后关闭生产者producer,只需关闭一次
producer.shutdown();
}
}
下面是消费者。之前我们使用的都是 MessageListenerConcurrently,表示并发消费;这里使用的消息监听器是MessageListenerOrderly。
package org.apache.rocketmq.example.ordermessage;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
// 实例化DefaultMQPushConsumer,同样这里需要设置消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumerGroup_name");
/*
* 可以通过下面这句话来设置name server的地址,当然你也可以通过环境变量来进行设置
* 因为我们在前面已经设置了环境变量,所以这里应该将这句话注释掉
*/
consumer.setNamesrvAddr("192.168.35.128:9876");
/*
* 在指定的消费者组是全新的情况下,指定从哪里开始消费。
* ConsumeFromWhere是一个枚举类
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* 指定要订阅的topic来进行消费,后面的表达式可以通过消息中的tag来对消息进行过滤
*/
consumer.subscribe("TopicOrderTest", "*");
/*
* 注册一个回调函数,这个回调函数会在消息从broker中取过来的时候调用执行
* 这里有一点非常关键,也是决定了这种方式是顺序消费消息的根本
* 这里注册的消息监听器是MessageListenerOrderly,注意与MessageListenerConcurrently的区别
* 使用前者就能够保证同一个线程总是从同一个队列中去取消息,而后者则不能保证这一点
* 所以,通过生产者和消费者的配合,生产者将同一个订单号下的所有消息都放在同一个队列中
* 而消费者端,同一个线程总是消费同一个队列中的消息,这样我们就能够保证,每一个订单号下面的所有消息被同一个线程消费
* 也就是顺序消息的机制
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
//开启消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
下面提取一些消费者打印日志,从日志中我们能够看出:相同线程消费的queueId总是相同的,而MessageListenerConcurrently则不保证这一点;另外从queueOffset也能够看出,由于队列的FIFO特性,消息是被顺序消费的。
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=173, queueOffset=31, sysFlag=0, bornTimestamp=1559657960379, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959358, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009A13B, commitLogOffset=631099, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47BB0004, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=173, queueOffset=32, sysFlag=0, bornTimestamp=1559657960383, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959362, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009A3EF, commitLogOffset=631791, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47BF0008, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=174, queueOffset=33, sysFlag=0, bornTimestamp=1559657960385, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959364, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009A549, commitLogOffset=632137, bodyCRC=193412630, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47C1000A, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=174, queueOffset=34, sysFlag=0, bornTimestamp=1559657960390, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959369, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009A801, commitLogOffset=632833, bodyCRC=216726031, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47C6000E, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 52], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=174, queueOffset=35, sysFlag=0, bornTimestamp=1559657960394, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959373, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009AAB9, commitLogOffset=633529, bodyCRC=89962020, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47CA0012, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 56], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=174, queueOffset=41, sysFlag=0, bornTimestamp=1559657960414, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959392, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009B8FF, commitLogOffset=637183, bodyCRC=1080943664, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47DE0027, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 57], transactionId='null'}]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=174, queueOffset=32, sysFlag=0, bornTimestamp=1559657960435, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959414, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009C8A1, commitLogOffset=641185, bodyCRC=717801981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=40, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47F3003E, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=3, storeSize=174, queueOffset=26, sysFlag=0, bornTimestamp=1559657960408, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959387, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009B4EB, commitLogOffset=636139, bodyCRC=548932910, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=40, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47D80021, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 51], transactionId='null'}]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=174, queueOffset=33, sysFlag=0, bornTimestamp=1559657960439, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959417, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009CB59, commitLogOffset=641881, bodyCRC=765791716, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=40, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47F70042, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 54], transactionId='null'}]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=174, queueOffset=34, sysFlag=0, bornTimestamp=1559657960444, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959423, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009CF6D, commitLogOffset=642925, bodyCRC=869529788, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=40, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47FC0048, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 50], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=174, queueOffset=42, sysFlag=0, bornTimestamp=1559657960416, bornHost=/192.168.35.1:9319, storeTimestamp=1559657959395, storeHost=/192.168.35.128:10911, msgId=C0A8238000002A9F000000000009BA5B, commitLogOffset=637531, bodyCRC=33028805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicOrderTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, UNIQ_KEY=6FBA7B3B103C18B4AAC2143D47E00029, WAIT=true}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52, 49], transactionId='null'}]]
版权声明:本文为gaishi_hero原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。