16.RabbitMQ 消费端并发和限流设置

  • Post author:
  • Post category:其他




并发

有时候,我们需要加快消息的处理速度,这时候,我们可以通过提高消息处理程序的并发量,来提高消息的处理能力;

在rabbitListener中配置concurency=“min-max”

如下代表最小并发数是5,10代表最大并发

@RabbitListener(queues =“textQueue”,concurrency = “5-10”)

通过提高并发,来增强消息处理能力。

1、队列准备10条数据

在这里插入图片描述

2、消费者方法设置并发5-8

@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE},concurrency = "5-8")
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

    try {
        System.out.println("接收到的MQ消息:"+message);
        //处理业务
        System.out.println("处理业务");
        //手动Ack
        /**
         * 手动Ack参数说明
         * basicAck(long deliveryTag, boolean multiple)
         * deliveryTag:批量处理的标号,举例:这个队列现在有5条消息要消费,那么这批数据会标号从1-5递增,5的时候就会手动Ack  multiple:是否批量处理
         *
         */
        System.out.println("deliveryTag:" + deliveryTag);
        channel.basicAck(deliveryTag,true);
    }catch (Exception e){
        e.getMessage();
        /**
         * basicNack(long deliveryTag, boolean multiple, boolean requeue)
         * requeue:是否送回队列
         */
        try {
            channel.basicNack(deliveryTag,false,true);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

}

3、并发处理结果:

在这里插入图片描述



限流

有些场景,消费者端处理并发太大的时候,会影响消息处理端的性能,这时候需要限流

1、消费者RabbitMqServiceImpl配置限流

@Autowired
CachingConnectionFactory cachingConnectionFactory;

@Bean(name="limitContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
    SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cachingConnectionFactory);
    //设置消息,每批最多获取3个
    factory.setPrefetchCount(3);
    return factory;
}

2、接收消息方法设置

@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE},containerFactory = "limitContainerFactory")
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

    try {
        System.out.println("接收到的MQ消息:"+message);
        //处理业务
        System.out.println("处理业务");
        //手动Ack
        /**
         * 手动Ack参数说明
         * basicAck(long deliveryTag, boolean multiple)
         * deliveryTag:批量处理的标号,举例:这个队列现在有5条消息要消费,那么这批数据会标号从1-5递增,5的时候就会手动Ack  multiple:是否批量处理
         *
         */
        System.out.println("deliveryTag:" + deliveryTag);
        channel.basicAck(deliveryTag,true);
    }catch (Exception e){
        e.getMessage();
        /**
         * basicNack(long deliveryTag, boolean multiple, boolean requeue)
         * requeue:是否送回队列
         */
        try {
            channel.basicNack(deliveryTag,false,true);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

}

3、限流结果

在这里插入图片描述



版权声明:本文为Stubborn_bull原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。