RabbitMq延时队列(基于死信 and 基于插件)
死信
什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
1:消息 TTL 过期
2:队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3:消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
代码演示
消息过期放入死信
消费者A
package com.leava.cloud.deadMsg;
import com.google.common.collect.Maps;
import com.leava.cloud.util.MqConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.Map;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
* 消费者 死信队列
*/
public class ConsumerO1 {
//声明普通队列
public static final String ORDINARY_QUEUE = "ordinary_queue";
//声明普通交换机
public static final String ORDINARY_CHANGE = "ordinary_change";
//声明死信队列
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
//声明死信交换机
public static final String DEAD_LETTER_CHANGE = "dead_letter_change";
//声明普通routingKey
public static final String ORDINARY_ROUTING_KEY = "ordinary_routing_key";
//声明死信routingKey
public static final String DEAD_ROUTING_KEY = "dead_routing_key";
public static void main(String[] args) throws Exception {
Channel channel = MqConnectionUtil.getChannel();
/**
* 创建普通交换机 类型 为 direct
*/
channel.exchangeDeclare(ORDINARY_CHANGE, BuiltinExchangeType.DIRECT);
/**
* 创建死信交换机 类型 为 direct
*/
channel.exchangeDeclare(DEAD_LETTER_CHANGE, BuiltinExchangeType.DIRECT);
/**
* 创建普通队列 指定 死信交换机 以及死信 routingkey
*/
Map<String, Object> arguments = Maps.newHashMap();
arguments.put("x-dead-letter-exchange", DEAD_LETTER_CHANGE);
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
channel.queueDeclare(ORDINARY_QUEUE, true, false, false, arguments);
/**
* 创建死信队列
*/
channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null);
System.out.println("等待接收消息......");
/**
* 绑定普通队列和普通交换机
*/
channel.queueBind(ORDINARY_QUEUE, ORDINARY_CHANGE, ORDINARY_ROUTING_KEY);
/**
* 绑定死信队列和死信交换机
*/
channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_CHANGE, DEAD_ROUTING_KEY);
/**
* 接收消息 不自动应答
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到消息>>>" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
channel.basicQos(1); //不公平分发
channel.basicConsume(ORDINARY_QUEUE, false, deliverCallback, (consumerTag -> {
System.out.println("接收消息失败>>>" + consumerTag);
}));
}
}
消费者B
package com.leava.cloud.deadMsg;
import com.leava.cloud.util.MqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
* 消费者 死信队列
*/
public class ConsumerO2 {
//声明死信队列
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static void main(String[] args) throws Exception {
Channel channel = MqConnectionUtil.getChannel();
System.out.println("死信队列等待接收消息......");
/**
* 接收消息 不自动应答
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到消息>>>" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
channel.basicQos(1); //不公平分发
channel.basicConsume(DEAD_LETTER_QUEUE, false, deliverCallback, (consumerTag -> {
System.out.println("接收消息失败>>>" + consumerTag);
}));
}
}
生产者
package com.leava.cloud.deadMsg;
import com.leava.cloud.util.MqConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/18 0018
*/
public class Producer {
//声明普通交换机
public static final String ORDINARY_CHANGE = "ordinary_change";
//声明普通routingKey
public static final String ORDINARY_ROUTING_KEY = "ordinary_routing_key";
public static void main(String[] args) throws Exception {
Channel channel = MqConnectionUtil.getChannel();
/**
* 死信队列 设置TTL 过期时间 10S
*/
AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().expiration("10000").build();
for (int i = 0; i < 11; i++) {
String message = "info:"+i;
channel.basicPublish(ORDINARY_CHANGE,ORDINARY_ROUTING_KEY,build,message.getBytes());
}
}
}
演示
测试流程: 先启动 消费者A 再启动消费者B 接着停掉消费者A 启动生产者
消息先进入普通队列中接着等待10S超过了设置的时候,那么就会进入死信队列
由死信队列进行消费
队列达到最大长度放入死信
在原来的消费者A中加入这一行代码即可
演示
删除掉原来的交换机和队列重新启动消费者A再启动消费B 启动生产者
可以看到 首次发消息是通过普通队列 之后就会截取每超过6条就会截取到死信队列中
消息被拒绝放入死信
修改消费者A的代码 拒绝消息为info:5的 将其放入死信
演示
注释掉最大长度放入死信的影响 并且删除掉原来的队列
启动消费者A 和 B 启动生产者
可以看到达到了我们的预期
延时队列
基于死信的延时队列
新建SpringBoot项目 在pom文件中加入依赖
我的是微服务项目, 读者可以去掉nacos客户端和feign客户端
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--添加nacos客户端-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!--添加Lombok插件依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
死信队列配置
package com.leava.cloud.config;
import com.google.common.collect.Maps;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
* 基于死信的延时队列
*/
@Configuration
public class DeadTTlConfig {
//声明普通交换机
public static final String ORDINARY_CHANGE = "ordinary_change";
//声明死信交换机
public static final String DEAD_LETTER_CHANGE = "dead_letter_change";
//声明普通队列1
public static final String ORDINARY_QUEUE = "ordinary_queue";
//声明普通队列2
public static final String ORDINARY_QUEUE_TWO = "ordinary_queue_two";
//声明死信队列
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
//声明普通队列1和普通交换机的 routingKey
public static final String ORDINARY_ROUTING_KEY = "ordinary_routing_key";
//声明普通队列2和普通交换机的 routingKey
public static final String ORDINARY_TWO_ROUTING_KEY = "ordinary_two_routing_key";
//声明死信交换机和死信队列的routingKey
public static final String DEAD_ROUTING_KEY = "dead_routing_key";
/**
* 创建普通交换机
*/
@Bean("ordinaryChange")
public DirectExchange ordinaryChange() {
return new DirectExchange(ORDINARY_CHANGE);
}
/**
* 创建死信交换机
*/
@Bean("deadLetterChange")
public DirectExchange deadLetterChange() {
return new DirectExchange(DEAD_LETTER_CHANGE);
}
/**
* 创建普通队列1
*/
@Bean("ordinaryQueue")
public Queue ordinaryQueue() {
Map<String, Object> arguments = Maps.newHashMap();
arguments.put("x-dead-letter-exchange", DEAD_LETTER_CHANGE);
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return QueueBuilder.durable(ORDINARY_QUEUE).withArguments(arguments).build();
}
/**
* 创建普通队列2
*/
@Bean("ordinaryQueueTwo")
public Queue ordinaryQueueTwo() {
Map<String, Object> arguments = Maps.newHashMap();
arguments.put("x-dead-letter-exchange", DEAD_LETTER_CHANGE);
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return QueueBuilder.durable(ORDINARY_QUEUE_TWO).withArguments(arguments).build();
}
/**
* 创建死信队列
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
/**
* 绑定普通队列1和普通交换机
*/
@Bean
public Binding ordinaryQueueBindingOrdinaryChange(
@Qualifier("ordinaryChange") DirectExchange ordinaryChange,
@Qualifier("ordinaryQueue") Queue ordinaryQueue) {
return BindingBuilder.bind(ordinaryQueue).to(ordinaryChange).with(ORDINARY_ROUTING_KEY);
}
/**
* 绑定普通队列2和普通交换机
*/
@Bean
public Binding ordinaryQueueTwoBindingOrdinaryChange(
@Qualifier("ordinaryChange") DirectExchange ordinaryChange,
@Qualifier("ordinaryQueueTwo") Queue ordinaryQueueTwo) {
return BindingBuilder.bind(ordinaryQueueTwo).to(ordinaryChange).with(ORDINARY_TWO_ROUTING_KEY);
}
/**
* 绑定死信队列和死信交换机
*/
@Bean
public Binding deadQueueTwoBindingOrdinaryChange(
@Qualifier("deadLetterChange") DirectExchange deadLetterChange,
@Qualifier("deadLetterQueue") Queue deadLetterQueue) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterChange).with(DEAD_ROUTING_KEY);
}
}
监听器
package com.leava.cloud.monitor;
import com.leava.cloud.config.DeadTTlConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
* 监听
*/
@Slf4j
@Component
public class DeadTTlQueueListen {
@RabbitListener(queues = DeadTTlConfig.DEAD_LETTER_QUEUE)
public void receiveDeadQueue(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
生产者
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
*/
@Slf4j
@RestController
@RequestMapping("/dead/send")
public class DeadController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/message/{msg}/{ttlTime}")
public void message(@PathVariable("msg") String msg,@PathVariable("ttlTime") String ttlTime){
log.info("发送消息{},过期时间为:"+ttlTime);
rabbitTemplate.convertAndSend(DeadTTlConfig.ORDINARY_CHANGE,DeadTTlConfig.ORDINARY_ROUTING_KEY,msg, correlationData->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
}
}
测试
首先发第一条消息 msg为 baby1 ttlTime为20000 代表20S过期 进入死信队列进行消费
接着发第二条消息 msg为baba2 ttlTime为10000 代表10S过期 进入死信队列进行消费
观察两条消息的运行前后顺序
发现一个问题,我们第二次发的信息的ttl是比第一次发的信息的过期时间要短的,应该是先消费第二条,再消费第一条,但是还是遵循这先进先出的原则。
基于插件的延时队列
安装延时队列插件
地址: https://www.rabbitmq.com/community-plugins.html
下载rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
出现下面这个东西 证明安装成功
演示
package com.leava.cloud.config;
import com.google.common.collect.Maps;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
* 基于插件的延时队列
*/
@Configuration
public class DelayedConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue.name";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange.name";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey.name";
/**
* 自定义交换机
*/
@Bean
public CustomExchange customExchanges(){
Map<String, Object> arguments = Maps.newHashMap();
arguments.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}
/**
* 创建队列
*/
@Bean
public Queue delayedQ(){
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
/**
* 绑定
*/
@Bean
public Binding customBindingQ(@Qualifier("customExchanges") CustomExchange customExchanges,
@Qualifier("delayedQ") Queue delayedQ){
return BindingBuilder.bind(delayedQ).to(customExchanges).with(DELAYED_ROUTING_KEY).noargs();
}
}
package com.leava.cloud.monitor;
import com.leava.cloud.config.DelayedConfig;
import com.leava.cloud.config.DelayedQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Description
* @Author mq
* @Version V1.0.0
* @Date 2021/6/19 0019
* 基于插件的监听器
*/
@Slf4j
@Component
public class DelayedListen {
@RabbitListener(queues= DelayedConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
}
@GetMapping("/message_delayed/{msg}/{delayedTime}")
public void messageDelayed(@PathVariable("msg") String msg,@PathVariable("delayedTime") Integer delayedTime){
log.info("发送消息{},延时时长为:{}",msg,delayedTime);
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE_NAME,DelayedConfig.DELAYED_ROUTING_KEY,msg , correlationData->{
correlationData.getMessageProperties().setDelay(delayedTime);
return correlationData;
});
}
测试
可以看到解决了上面那个问题,不再是先进先出的规则