RocketMQ知识盘点【叁】_Consumer

  • Post author:
  • Post category:其他


前文回顾:


RocketMQ知识盘点【壹】_Producer和NameServer


RocketMQ知识盘点【贰】_Broker和消息存储

1.模式

1.1 模式类型

分为推模式和拉模式。

推模式是broker向consumer注册一个listener接口,收到消息后会回调listener接口,采用长轮询方式实现push;

拉模式是consumer主动向broker拉消息,下文介绍。本质上,推模式是在拉模式上包装了一层,一个拉取任务完成后开始下一个拉取任务。

简短截说,拉模式下consumer端会使用PullMessageService(本质是一个线程),不断构造PullRequest到pullRequestQueue,经历负载均衡向broker发送拉取消息请求。每次默认拉取32个消息,存放到ProcessQueue中,当消息消费成功再从中移除。默认情况消息超过15分钟未消费成功,则延迟3个延迟级别进行重试。

推模式是基于拉模式实现的,在于PullRequest完成一次拉取消息任务后,又再次把它放回pullRequestQueue中,实现循环请求拉取。如果consumer向broker发送消息拉取请求后,没有消息到达消费队列:

如果未启用长轮询机制,则毁在服务端等待shortPollingTimeMills(挂起)后再判断消息是否到达消息队列,如果消息仍未到达则提示消息不存在(PULL_NOT_FOUND)。

如果启动长轮询机制,一方面会每10s检查消息是否到达,同时一有新消息到达马上通知挂起线程再次验证新消息是否感兴趣,如果不是则继续等待直到挂起超时(默认15s)。

长轮询模式通过longPollingEnable=true设置。

1.2 比较

推模式:broker负责消息存储、处理请求、推送状态等

优点是实时性好;缺点是如果push能力大于消费能力,则可能造成消费者挂掉或消息丢失;

拉模式:consumer除了消费消息还要保存偏移量,以及异常场景下消息缓存等

优点是拉取频率可以自己决定;缺点是可能造成broker消息堆积;

2 负载均衡

消息的分配遵循一个原则:一个consumer可以消费多个messageQueue,但是一个messageQueue只能分配给一个consumer消费。rocketMq共提供了5种分配算法:

1.AllocateMessageQueueAveragely:平均分配;

2.AllocateMessageQueueAveragelyByCircle:平均轮询分配;

3.AllocateMessageConsistentHash:一致性哈希;

4.AllocateMessageQueueByConfig:自定义配置;

5.AllocateMessageQueueByMacheRoom:根据broker机房名。

注意,基于上面的原则,当consumer数量大于messageQueue数量时,会有一部分consumer无法消费消息。

3 顺序消息

rocketMQ不保证一个topic下的所有messageQueue消息顺序性,但可以保证一个messageQueue下消息的顺序性。

4 延时消息

RocketMQ不支持自定义延时,目前共支持18个延时等级,分别为

1s,5s,10s,30s,1min,2min,3min,4min,5min,6min,7min,8min,9min,10min,20min,30min,1h,2h

在每个broker有一个默认topic,叫SCHEDULE_TOPIC_XXXX,这个topic下共有18个messageQueue,queueId从0到17,分别对应这18个延迟等级。同时对每个messageQueue设置一个定时任务,每隔1s到对应的messageQueue从最近一次处理的offset开始捞取要处理的消息,根据消息的物理偏移量和长度到commitLog里拿到完整消息体,然后构造出新的消息顺序写入commitLog,再将其投入到相应的messageQueue中去。

5 消息重试

集群模式下,消息的消费进度保存在broker(广播模式broker把消息广播出去就不负责了)。如果consumer返回RECONSUME_LATER,则broker启动消息重试机制。先将消息的topic和messageQueue保存到消息属性,然后将此消息存储到SCHEDULE_TOPIC_XXXX相应的延迟级别队列中。当到达重试时间后,该消息会被投入到topic为“%RETRY%消费者组名”的队列中,每台broker只会为同一个消费者组创建一个重试队列。“%RETRY%消费者组名”这个topic,在concumer启动的时候已经自动关注,所以当后面consumer来拉取消息时,会直接将重试消息拉走消费。

最佳实践:建议不要依赖rocketMQ的消息重试,可以自己在应用通过延时消息完成延时消息重试。

6 死信队列

当一条消息重试16次仍然失败,则进入死信队列,topic为“%DLQ%消费者组名”。不再被消费,需要人工处理。这里注意,每次重试消息延迟级别不是delayLevel++的,而是有一套根据重试次数的算法。

7 消息过滤

可以通过配置,使用消息的tag或表达式过滤。


RocketMQ知识盘点【壹】_Producer和NameServer


RocketMQ知识盘点【贰】_Broker和消息存储


RocketMQ知识盘点【叁】_Consumer


RocketMQ知识盘点【肆】_最佳实践



版权声明:本文为xinzun原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。