-
业务描述
由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群):
-
producer如何实现
producer只需发送消息时调用如下方法即可
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
关键是如何实现MessageQueueSelector:
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
这样,所有的消息会根据消息的尾号,轮询的落到相应的queue上。参考图2,假设id=10001231,由于一共有20个queue,所以10001231%20=11,故消息会落到broker-b queue-1上。
-
consumer端如何实现
针对consumer由于没有限制是顺序消费,故可以采用集群消费模式的DefaultMQPushConsumer,由于一个消费者消费一类queue,故需要10个consumer group,比如consumer group0需要消费的queue为broker-a queue-0和broker-b queue-0,如下图的概示:
那么需要自己实现一个AllocateMessageQueueStrategy进行queue的分配,我们假设consumer group的名字格式需要提前定好,如xxx{queueid}ConsumerGroup,那么实现如下:- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
借助AllocateMessageQueueAveragely来实现,以便有多个jvm的消费者时,能够进行集群消费,但是针对上面这个例子,消费者jvm实例不能超过2个,至于为什么,参照下图: