并发
有时候,我们需要加快消息的处理速度,这时候,我们可以通过提高消息处理程序的并发量,来提高消息的处理能力;
在rabbitListener中配置concurency=“min-max”
如下代表最小并发数是5,10代表最大并发
@RabbitListener(queues =“textQueue”,concurrency = “5-10”)
通过提高并发,来增强消息处理能力。
1、队列准备10条数据
2、消费者方法设置并发5-8
@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE},concurrency = "5-8")
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("接收到的MQ消息:"+message);
//处理业务
System.out.println("处理业务");
//手动Ack
/**
* 手动Ack参数说明
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:批量处理的标号,举例:这个队列现在有5条消息要消费,那么这批数据会标号从1-5递增,5的时候就会手动Ack multiple:是否批量处理
*
*/
System.out.println("deliveryTag:" + deliveryTag);
channel.basicAck(deliveryTag,true);
}catch (Exception e){
e.getMessage();
/**
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:是否送回队列
*/
try {
channel.basicNack(deliveryTag,false,true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
3、并发处理结果:
限流
有些场景,消费者端处理并发太大的时候,会影响消息处理端的性能,这时候需要限流
1、消费者RabbitMqServiceImpl配置限流
@Autowired
CachingConnectionFactory cachingConnectionFactory;
@Bean(name="limitContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
//设置消息,每批最多获取3个
factory.setPrefetchCount(3);
return factory;
}
2、接收消息方法设置
@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE},containerFactory = "limitContainerFactory")
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("接收到的MQ消息:"+message);
//处理业务
System.out.println("处理业务");
//手动Ack
/**
* 手动Ack参数说明
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:批量处理的标号,举例:这个队列现在有5条消息要消费,那么这批数据会标号从1-5递增,5的时候就会手动Ack multiple:是否批量处理
*
*/
System.out.println("deliveryTag:" + deliveryTag);
channel.basicAck(deliveryTag,true);
}catch (Exception e){
e.getMessage();
/**
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:是否送回队列
*/
try {
channel.basicNack(deliveryTag,false,true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
3、限流结果
版权声明:本文为Stubborn_bull原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。