前文回顾:
RocketMQ知识盘点【壹】_Producer和NameServer
1.模式
1.1 模式类型
分为推模式和拉模式。
推模式是broker向consumer注册一个listener接口,收到消息后会回调listener接口,采用长轮询方式实现push;
拉模式是consumer主动向broker拉消息,下文介绍。本质上,推模式是在拉模式上包装了一层,一个拉取任务完成后开始下一个拉取任务。
简短截说,拉模式下consumer端会使用PullMessageService(本质是一个线程),不断构造PullRequest到pullRequestQueue,经历负载均衡向broker发送拉取消息请求。每次默认拉取32个消息,存放到ProcessQueue中,当消息消费成功再从中移除。默认情况消息超过15分钟未消费成功,则延迟3个延迟级别进行重试。
推模式是基于拉模式实现的,在于PullRequest完成一次拉取消息任务后,又再次把它放回pullRequestQueue中,实现循环请求拉取。如果consumer向broker发送消息拉取请求后,没有消息到达消费队列:
如果未启用长轮询机制,则毁在服务端等待shortPollingTimeMills(挂起)后再判断消息是否到达消息队列,如果消息仍未到达则提示消息不存在(PULL_NOT_FOUND)。
如果启动长轮询机制,一方面会每10s检查消息是否到达,同时一有新消息到达马上通知挂起线程再次验证新消息是否感兴趣,如果不是则继续等待直到挂起超时(默认15s)。
长轮询模式通过longPollingEnable=true设置。
1.2 比较
推模式:broker负责消息存储、处理请求、推送状态等
优点是实时性好;缺点是如果push能力大于消费能力,则可能造成消费者挂掉或消息丢失;
拉模式:consumer除了消费消息还要保存偏移量,以及异常场景下消息缓存等
优点是拉取频率可以自己决定;缺点是可能造成broker消息堆积;
2 负载均衡
消息的分配遵循一个原则:一个consumer可以消费多个messageQueue,但是一个messageQueue只能分配给一个consumer消费。rocketMq共提供了5种分配算法:
1.AllocateMessageQueueAveragely:平均分配;
2.AllocateMessageQueueAveragelyByCircle:平均轮询分配;
3.AllocateMessageConsistentHash:一致性哈希;
4.AllocateMessageQueueByConfig:自定义配置;
5.AllocateMessageQueueByMacheRoom:根据broker机房名。
注意,基于上面的原则,当consumer数量大于messageQueue数量时,会有一部分consumer无法消费消息。
3 顺序消息
rocketMQ不保证一个topic下的所有messageQueue消息顺序性,但可以保证一个messageQueue下消息的顺序性。
4 延时消息
RocketMQ不支持自定义延时,目前共支持18个延时等级,分别为
1s,5s,10s,30s,1min,2min,3min,4min,5min,6min,7min,8min,9min,10min,20min,30min,1h,2h
在每个broker有一个默认topic,叫SCHEDULE_TOPIC_XXXX,这个topic下共有18个messageQueue,queueId从0到17,分别对应这18个延迟等级。同时对每个messageQueue设置一个定时任务,每隔1s到对应的messageQueue从最近一次处理的offset开始捞取要处理的消息,根据消息的物理偏移量和长度到commitLog里拿到完整消息体,然后构造出新的消息顺序写入commitLog,再将其投入到相应的messageQueue中去。
5 消息重试
集群模式下,消息的消费进度保存在broker(广播模式broker把消息广播出去就不负责了)。如果consumer返回RECONSUME_LATER,则broker启动消息重试机制。先将消息的topic和messageQueue保存到消息属性,然后将此消息存储到SCHEDULE_TOPIC_XXXX相应的延迟级别队列中。当到达重试时间后,该消息会被投入到topic为“%RETRY%消费者组名”的队列中,每台broker只会为同一个消费者组创建一个重试队列。“%RETRY%消费者组名”这个topic,在concumer启动的时候已经自动关注,所以当后面consumer来拉取消息时,会直接将重试消息拉走消费。
最佳实践:建议不要依赖rocketMQ的消息重试,可以自己在应用通过延时消息完成延时消息重试。
6 死信队列
当一条消息重试16次仍然失败,则进入死信队列,topic为“%DLQ%消费者组名”。不再被消费,需要人工处理。这里注意,每次重试消息延迟级别不是delayLevel++的,而是有一套根据重试次数的算法。
7 消息过滤
可以通过配置,使用消息的tag或表达式过滤。