RabbitMQ 死信队列的简单使用
关于RabbitMQ死信队列
死信队列,顾名思义,就是存放死信的队列,消息死掉的方式通常有以下几种:
-
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息 - 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
-
队列超载
当业务(普通)队列里的消息变为“死信”时, 被重新投递(publish)到另一个Exchange ,该 Exchange 就是DLX,然后该Exchange 根据绑定规则 转发到对应的队列上,监听该队列 就可以重新消费,说白了就是没有被消费的消息换个地方重新被消费。
生产者 –> 消息 –> 交换机 –> 队列 –> 变成死信 –> DLX交换机 –>队列 –> 消费者
实现步骤
之前的队列没有绑定死信队列和死信交换机 不能做更改绑定死信交互机
之前创建好的邮件队列 删除掉 已经创建好的队列不能做更改 交换机也清理掉
1、声明业务交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("test-exchange",true,false);
}
2、声明业务队列
@Bean
public Queue testQueue() {
Map<String, Object> map = new HashMap<>(2);
map.put("x-dead-letter-exchange","dead-letter-exchange");
map.put("x-dead-letter-routing-key","dead-letter-routing-key");
return new Queue("testQueue",true,false,false,map);
}
定义业务(普通)队列的时候指定参数
x-dead-letter-exchange: 用来指定死信交换机的名称
x-dead-letter-routing-key:用来设置死信的routingKey
当业务队列的消息成为死信时,会自动按照指定参数将死信发送到死信队列。
3、绑定业务交换机和业务队列
@Bean
public Binding testQueueBanding(Queue testQueue,TopicExchange topicExchange) {
return BindingBuilder.bind(testQueue).to(topicExchange).with("test-routing-key");
}
4、声明死信队列
@Bean
public Queue deadQueue() {
return new Queue("dead-queue",true,false,false);
}
5、声明死信交换机
@Bean
public TopicExchange deadExchange() {
return new TopicExchange("dead-letter-exchange",true,false);
}
6、绑定死信交换机和死信队列
@Bean
public Binding deadQueueBinding(Queue deadQueue,TopicExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead-letter-routing-key");
}
7、消息生产者
package com.br.dataStrategy.web;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: jianmin.li
* @Description: 死信队列测试
* @Date: 2019/6/5 10:37
* @Version: 1.0
*/
@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/rabbitmq")
public void method() {
amqpTemplate.convertAndSend("test-exchange","test-routing-key","死信队列测试");
}
}
8、业务队列消费者
@RabbitListener(queues = "testQueue")
public void testQuene(Message message,Channel channel) throws IOException {
System.err.println("==============测试testQueue接收到消息了=================" + new String(message.getBody()
,"utf-8"));
//让消息成为死信
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
9、死信队列消费者
@RabbitListener(queues = "dead-queue")
public void deadQueue(Message message,Channel channel) throws IOException {
System.err.println("===================死信队列dead-queue接收到消息了==================" + new String(message.getBody()
,"utf-8"));
//1、将消息存储进数据库
//2、写一个定时任务,定期将死信消息取出来并处理掉
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
启动项目,发送消息
参考博文
版权声明:本文为qq_40184563原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。