RocketMQ有序性分析

  • Post author:
  • Post category:其他

RocketMQ可以保证queue的消息顺序。
如果希望保证消息顺序,可以这怎么做:
(1)Producer对于需要顺序的消息发送到同一个queue中
(2)Consumer使用MessageListenerOrderly来对消息进行有序消费

代码示例在之前写过:https://blog.csdn.net/lblblblblzdx/article/details/87939187

接下来以push模式为例,详细分析消息流转的整个过程,是如何保障消息在经过多次流转后仍保持有序的。

1. Producer同步发送

对于需要有序的消息

  • 需要选择同一个queue,因为RocketMQ只保证queue的局部有序,无法保证topic的消息有序。
  • 需要等待上一个发送成功,收到成功的回复后,才可以发送下一个。
2. broker接收消息并落盘

broker底层使用netty,对于同一个连接,均会使用同一个线程来处理其I/O请求,在读取到RequestCommand后,会交给业务线程池异步处理,业务线程会将消息取出并且存储到commitLog中。

可以看出broker是会异步处理消息的,所以第一步中的Producer必须等待上一个消息完全成功后,才允许发送下一个,否则会导致在broker落盘的环节无序。

此时,依然可以保证commitLog中的消息是有序的。

3. Consumer订阅topic,并分摊队列

默认的消费模式是集群消费,在同一个消费组中的消费者会使用同样的负载均衡算法来分摊队列,一个消费者可以分摊到若干个队列,且锁定这些队列,这些队列只允许被这个消费者进行消费。

4. Consumer异步拉取消息

分摊完队列后,假如Consumer分摊到queue-0和queue-1,则会构造两个相应的pullRequest放到pullRequestQueue中。

Consumer在start的时候会启动一个线程PullMessageService来监听pullRequestQueue中的对象,一旦存在对象则取出处理,此处会异步拉取消息。不同队列的pull请求可能会同时在发送中,但是对于同一个队列,只有上一个pull请求完成之后,才可以生成下一个pull请求放到pullRequestQueue中。

总的来说,Consumer会并行拉取多个队列的消息,但对于同一个队列,拉取请求依然是串行的。

至此,可以知道的是,topic下的某一个队列,只会被某一个Consumer单线程拉取,所以队列的消息会按序的抵达Consumer。

5. Consumer异步处理消息

在上一步中,队列的消息按序的抵达Consumer后,会添加到队列对应的processQueue中,并提交一个异步的ConsumeRequest。ConsumeRequest在执行的过程,会申请队列对应的锁,所以对于同一个队列,ConsumeRequest依然是串行执行,对于不同的队列,ConsumerRequest是并行执行。
最终,Consumer可以按序消费同一个processQueue中的消息。

整个流程是这样的:

单线程按序同步发送-> broker按序落盘 -> Consumer串行拉取同一个队列消息 -> Consumer串行处理同一个队列消息

可以看到,整个流程用到了大量的串行处理来保证顺序,但是串行并不是单线程,而是对于同一个队列串行,对于不同队列并行。

在这里插入图片描述


版权声明:本文为lblblblblzdx原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。