Rocketmq常见问题与阶段性总结

  • Post author:
  • Post category:其他

​前面的文章分析了Rocketmq消息队列的常用功能原理,涉及到如下内容:

Rocketmq消息中间件实现了消息队列的基本功能,如消息发送,集群消费,广播消费,也实现了事务消息,定时消息,顺序消息等高级功能。

Rocketmq常见问题分析总结:

一、如何保证消息不重复消费 

    在Rocketmq中每条消息都有一个唯一的key代表不同消息实体,Rocketmq本身并不保证消息不被重复消费,它通过重试机制保证消息至少被消费者消费一次,因此可能在网络抖动情况下出现消息重复投递的情况,如何达到消息不被重复消费呢?

这里介绍笔者工作中常用的两种方式:

1、消息消费记录表

    消息业务id作为主键或者唯一id,每次消费前判断是否存在消费记录,可以尝试插入操作,如果已经存在则会报主键冲突(类似于redis的setNx指令效果)。存在则不消费,否则继续消费。

2、通过redis的setnx指令

    Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这里保证key是消息的唯一标志。

二、消息堆积如何处理 

1、提高消费并行度 

    同一个ConsumerGroup,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。

   提高单个 Consumer 的消费并行线程(扩大消费者线程池,默认20个核心线程),通过修改参数 consumeThreadMin、consumeThreadMax实现。 

2、批量消费方式 

    某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过`设置 consumer的 consumeMessageBatchMaxSize 参数`,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。 

3、跳过非重要消息 

    过滤掉不重要的消息,追赶上生产者进度发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。

如何定位具体哪个线程消费消息慢?

通过jstack 查看消费者线程状态,判断阻塞线程。

三、Rocketmq如何保证消息不丢失

消息丢失这个话题必须在Rocketmq的各个组成角色的角度来分析。

1、生产者自带重试功能

      至多重试2次。

     如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。

    如果本身向broker发送消息产生超时异常,就不会再重试。 

2、broker高可用和高可靠 

主从同步机制 

brokerRole broker角色类型

ASYNC_MASTER, 异步同步主节点

SYNC_MASTER, 同步同步主节点 

SLAVE; 从节点 

DiskFlush 刷盘策略 

默认写入缓冲区,不会同步落盘,通过定时5s进行刷新落盘

SYNC_FLUSH, 同步刷盘,刷盘完成再返回给客户端,超时5s

ASYNC_FLUSH,异步刷盘,200ms刷新一次,性能高 

3、消费者

    消费者消费消息时关闭自动提交ack,而是采用手动提交消息offset,消费者消费消息不成功,不返回CONSUME_SUCCESS,返回RECONSUME_LATER表示需要broker再次投递该消息。

生产者和broker保证消息不丢失,可能导致消息重复投递,因此消费者需要做幂等性处理。 

最后看下Rocketmq和Kafka消息中间件的对比:

学习了Rocketmq后发现Rocketmq确实是一个优秀的消息中间件,它不仅有阿里等大厂大流量的使用和历练,还拥有活跃的社区,最新的文档,不停的升级迭代,源码学习可以了解消息中间件的运行原理,对消息队列的实现机制更加掌握。

2022新的开始,祝大家新年快乐,2022继续学习新技术,一起继续进步!!


版权声明:本文为daimingbao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。