8 发布确认高级
由于不明原因导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。
8.1 发布与确认SpringBoot
生产者发送消息到交换机,交换机收到消息后,会触发确认后的回调方法。
8.1.1 配置文件
spring.rabbitmq.publisher-confirm-type=correlated
- none:默认值,禁用发布确认
- correlated:发布消息到交换机后触发回调方法
-
simple:有两种效果:第一种效果与correlated一样;其二,单个确认,在发布消息成功后使用rabbitTemplate,调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
8.1.2 配置类及编码
/**
* 发布确认高级
*/
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
//声明交换机
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明队列
@Bean
public Queue confirmQueue() {
return QueueBuilder
.durable(CONFIRM_QUEUE_NAME)
.build();
}
//绑定
@Bean
public Binding queueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder
.bind(confirmQueue)
.to(confirmExchange)
.with(CONFIRM_ROUTING_KEY);
}
}
生产者
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
log.info("发送消息内容:{}", message);
}
消费者
/**
* 接收消息
*/
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message) {
String msg = new String(message.getBody());
log.info("接收到的队列confirm.queue的消息:{}", msg);
}
}
8.1.3 回调
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机接收到消息后的回调方法
*
* @param correlationData 保存了回调的消息ID及相关信息
* @param ack true/false:成功/失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机成功收到了ID为:{}的消息", id);
} else {
log.info("交换机未收到了ID为:{}的消息,原因为:{}", id, cause);
}
}
}
8.2 回退消息
8.2.1 Mandatory参数
Mandatory译为强制性的。在仅开启生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,也就是交换机通过routingKey找不到对应的队列,那么此消息会被直接丢弃,此时,生产者是不知道消息被丢弃的。如何让无法被路由的消息被处理;通过设置Mandatory参数可以在当消息传递的过程中不可路由时,将消息返回给生产者。
8.2.2 编码实现
实现RabbitTemplate.ReturnCallback接口,并实现其的方法returnedMessage
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机接收到消息后的回调方法
*
* @param correlationData 保存了回调的消息ID及相关信息
* @param ack true/false:成功/失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机成功收到了ID为:{}的消息", id);
} else {
log.info("交换机未收到了ID为:{}的消息,原因为:{}", id, cause);
}
}
/**
* 当消息不可路由时,返回给生产者
*
* @param message 消息
* @param replyCode 退回码
* @param replyText 退回原因
* @param exchange 交换机
* @param routingKey routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
String routingKey) {
log.error("消息:{},被交换机:{}退回,退回原因:{},路由Key:{}", new String(message.getBody()),
exchange, replyText, routingKey);
}
}
8.3 备份交换机
在上面我们提到如果生产者的消息发送到交换机,在发送的过程中出现了问题,并没有发送到交换机;我们需要进行消息的回退,让消息回退到生产者。这只是一种解决方法,还有一种较好的方法是增加一台备份交换机。让消息不在回退到消费者,而是发送到备份交换机上。
8.3.1 案例
增加备份交换机,并在发送失败时提供警告功能
配置信息
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
//警告队列
public static final String WARNING_QUEUE_NAME = "warning_queue";
//声明交换机
@Bean
public DirectExchange confirmExchange() {
return ExchangeBuilder
.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME) //备份交换机参数:指定备份交换机
.build();
}
//声明队列
@Bean
public Queue confirmQueue() {
return QueueBuilder
.durable(CONFIRM_QUEUE_NAME)
.build();
}
//绑定
@Bean
public Binding queueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange)
{
return BindingBuilder
.bind(confirmQueue)
.to(confirmExchange)
.with(CONFIRM_ROUTING_KEY);
}
//备份交换机
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//备份队列
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
//警报队列
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//绑定备份队列与备份交换机
@Bean
public Binding backQueueBindingBackExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange)
{
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
//绑定警报队列与备份交换机
@Bean
public Binding warningQueueBindingBackExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange
backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
生产者
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, "1 " + message, correlationData);
log.info("发送消息内容:{}", "1 " + message);
//模拟不可路由发送的消息
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY + "2", "2 " + message, correlationData2);
log.info("发送消息内容:{}", "2 " + message);
}
警报消费者
/**
* 警报消费者
*/
@Slf4j
@Component
public class WarningConsumer {
//接收警报信息
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}
2022-12-02 23:35:43.526 INFO 76792 --- [nio-8080-exec-1] c.w.r.controller.ProducerController : 发送消息内容:1 啊啊啊
2022-12-02 23:35:43.528 INFO 76792 --- [nectionFactory1] com.wang.rabbitmq.config.MyCallBack : 交换机成功收到了ID为:1的消息
2022-12-02 23:35:43.529 INFO 76792 --- [nio-8080-exec-1] c.w.r.controller.ProducerController : 发送消息内容:2 啊啊啊
2022-12-02 23:35:43.533 ERROR 76792 --- [ntContainer#3-1] c.w.rabbitmq.consumer.WarningConsumer : 报警发现不可路由消息:2 啊啊啊
2022-12-02 23:35:43.533 INFO 76792 --- [ntContainer#0-1] c.w.rabbitmq.consumer.ConfirmConsumer : 接收到的队列confirm.queue的消息:1 啊啊啊
2022-12-02 23:35:43.555 INFO 76792 --- [nectionFactory1] com.wang.rabbitmq.config.MyCallBack : 交换机成功收到了ID为:2的消息
注
:当回退消息和备份交换机在配置文件application.properties同时存在时,
优先使用备份交换机
版权声明:本文为QQwli原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。