43 RocketMQ对百万消息积压问题的处理方案

  • Post author:
  • Post category:其他


业务场景:

在一个系统中,由生产者系统和消费者系统两个环节组成,生产者系统不停的把消息写入RocketMQ里去,然后消费者系统就负责从RocketMQ里消费消息。

该系统在生产环境的高峰期内,大概有100多万条消息进入MQ。然后消费者系统从MQ里获取消息,并依赖Redis去进行一些业务逻辑的实现。

某天在高峰期突然间出了问题,消费者系统依赖的Redis挂掉了,导致消费者系统也阻塞无法继续执行下去了。

消费者系统的阻塞停止运行,意味着不会再从RocketMQ里去消费数据和处理了。而此时的生产者系统依旧不间断往MQ里写入100多万的消息,那么这些消息都会积压在MQ里,而无法被及时消费和处理掉。

解决方案

此时有几种解决方案,如果这些消息是允许丢失的,那么就紧急修改消费者系统的代码,在代码中对所有的消息都获取到后

直接丢弃

,也就是不做任何的处理,这样可以快速的将积压的百多万消息处理掉。这是丢弃的方案。

而如果消息是不能丢弃掉的话,就必须在消费者系统底层依赖的Redis恢复之后,根据线上Topic的MessageQueue的数量来采取相应的处理方案。


MessageQueue众多的情况下

假如Topic有20个MessageQueue,而目前只有4个消费者系统在消费,那么每个消费者系统会从5个MessageQueue里获取消息,所以此时仅仅依靠4个消费者系统是不能满足快速处理MQ中积压的百多万消息的。

此时可以再申请16台机器去部署16台消费者系统实例,然后20个消费者系统同时消费,每个实例消费一个MessageQueue的消息,此时的消费速度就会提高了5倍,很快就能将积压的百多万消息处理掉。


MessageQueue等于消费者实例数量的情况下

假如Topic有4个MessageQueue,而集群中的消费者系统也是正好4个,那么就是每个消费者系统去消费一个MessageQueue里的消息,此时就无法使用上面增加消费者实例的方案来提高消费速度了。

此时应该修改4个消费者系统的代码,让他们取到消息后不写入Redis,而是直接把消息写入一个新的Topic,这样速度就会很快了,因为只是读写MQ而已。

然后新的Topic有20个MessageQueue,再部署20台消费者实例,去消费新的Topic的消息后写入到Redis中,这样就可以快速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理。



版权声明:本文为weixin_42405670原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。