前面的文章分析了Rocketmq消息队列的常用功能原理,涉及到如下内容:
Rocketmq消息中间件实现了消息队列的基本功能,如消息发送,集群消费,广播消费,也实现了事务消息,定时消息,顺序消息等高级功能。
Rocketmq常见问题分析总结:
一、如何保证消息不重复消费
在Rocketmq中每条消息都有一个唯一的key代表不同消息实体,Rocketmq本身并不保证消息不被重复消费,它通过重试机制保证消息至少被消费者消费一次,因此可能在网络抖动情况下出现消息重复投递的情况,如何达到消息不被重复消费呢?
这里介绍笔者工作中常用的两种方式:
1、消息消费记录表
消息业务id作为主键或者唯一id,每次消费前判断是否存在消费记录,可以尝试插入操作,如果已经存在则会报主键冲突(类似于redis的setNx指令效果)。存在则不消费,否则继续消费。
2、通过redis的setnx指令
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这里保证key是消息的唯一标志。
二、消息堆积如何处理
1、提高消费并行度
同一个ConsumerGroup,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
提高单个 Consumer 的消费并行线程(扩大消费者线程池,默认20个核心线程),通过修改参数 consumeThreadMin、consumeThreadMax实现。
2、批量消费方式
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过`设置 consumer的 consumeMessageBatchMaxSize 参数`,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
3、跳过非重要消息
过滤掉不重要的消息,追赶上生产者进度发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。
如何定位具体哪个线程消费消息慢?
通过jstack 查看消费者线程状态,判断阻塞线程。
三、Rocketmq如何保证消息不丢失
消息丢失这个话题必须在Rocketmq的各个组成角色的角度来分析。
1、生产者自带重试功能
至多重试2次。
如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
如果本身向broker发送消息产生超时异常,就不会再重试。
2、broker高可用和高可靠
主从同步机制
brokerRole broker角色类型
ASYNC_MASTER, 异步同步主节点
SYNC_MASTER, 同步同步主节点
SLAVE; 从节点
DiskFlush 刷盘策略
默认写入缓冲区,不会同步落盘,通过定时5s进行刷新落盘
SYNC_FLUSH, 同步刷盘,刷盘完成再返回给客户端,超时5s
ASYNC_FLUSH,异步刷盘,200ms刷新一次,性能高
3、消费者
消费者消费消息时关闭自动提交ack,而是采用手动提交消息offset,消费者消费消息不成功,不返回CONSUME_SUCCESS,返回RECONSUME_LATER表示需要broker再次投递该消息。
生产者和broker保证消息不丢失,可能导致消息重复投递,因此消费者需要做幂等性处理。
最后看下Rocketmq和Kafka消息中间件的对比:
学习了Rocketmq后发现Rocketmq确实是一个优秀的消息中间件,它不仅有阿里等大厂大流量的使用和历练,还拥有活跃的社区,最新的文档,不停的升级迭代,源码学习可以了解消息中间件的运行原理,对消息队列的实现机制更加掌握。
2022新的开始,祝大家新年快乐,2022继续学习新技术,一起继续进步!!