RabbitMQ 发布确认高级

  • Post author:
  • Post category:其他




8 发布确认高级

由于不明原因导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。



8.1 发布与确认SpringBoot

生产者发送消息到交换机,交换机收到消息后,会触发确认后的回调方法。



8.1.1 配置文件

spring.rabbitmq.publisher-confirm-type=correlated
  • none:默认值,禁用发布确认
  • correlated:发布消息到交换机后触发回调方法
  • simple:有两种效果:第一种效果与correlated一样;其二,单个确认,在发布消息成功后使用rabbitTemplate,调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是

    waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。



8.1.2 配置类及编码

/**
 * 发布确认高级
 */
@Configuration
public class ConfirmConfig {
    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //声明交换机
    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder
                .durable(CONFIRM_QUEUE_NAME)
                .build();
    }

    //绑定
    @Bean
    public Binding queueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                               @Qualifier("confirmExchange") DirectExchange confirmExchange) 	{
        return BindingBuilder
                .bind(confirmQueue)
                .to(confirmExchange)
                .with(CONFIRM_ROUTING_KEY);
    }
}

生产者

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("发送消息内容:{}", message);
    }

消费者

/**
 * 接收消息
 */
@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm.queue的消息:{}", msg);
    }
}



8.1.3 回调

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机接收到消息后的回调方法
     *
     * @param correlationData 保存了回调的消息ID及相关信息
     * @param ack             true/false:成功/失败
     * @param cause           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机成功收到了ID为:{}的消息", id);
        } else {
            log.info("交换机未收到了ID为:{}的消息,原因为:{}", id, cause);
        }
    }
}



8.2 回退消息



8.2.1 Mandatory参数

Mandatory译为强制性的。在仅开启生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,也就是交换机通过routingKey找不到对应的队列,那么此消息会被直接丢弃,此时,生产者是不知道消息被丢弃的。如何让无法被路由的消息被处理;通过设置Mandatory参数可以在当消息传递的过程中不可路由时,将消息返回给生产者。



8.2.2 编码实现

实现RabbitTemplate.ReturnCallback接口,并实现其的方法returnedMessage

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机接收到消息后的回调方法
     *
     * @param correlationData 保存了回调的消息ID及相关信息
     * @param ack             true/false:成功/失败
     * @param cause           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机成功收到了ID为:{}的消息", id);
        } else {
            log.info("交换机未收到了ID为:{}的消息,原因为:{}", id, cause);
        }
    }

    /**
     * 当消息不可路由时,返回给生产者
     *
     * @param message    消息
     * @param replyCode  退回码
     * @param replyText  退回原因
     * @param exchange   交换机
     * @param routingKey routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, 
                                String routingKey) {
        log.error("消息:{},被交换机:{}退回,退回原因:{},路由Key:{}", new String(message.getBody()), 
                  exchange, replyText, routingKey);
    }
}



8.3 备份交换机

在上面我们提到如果生产者的消息发送到交换机,在发送的过程中出现了问题,并没有发送到交换机;我们需要进行消息的回退,让消息回退到生产者。这只是一种解决方法,还有一种较好的方法是增加一台备份交换机。让消息不在回退到消费者,而是发送到备份交换机上。



8.3.1 案例

增加备份交换机,并在发送失败时提供警告功能

在这里插入图片描述

配置信息

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
	//备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //警告队列
    public static final String WARNING_QUEUE_NAME = "warning_queue";

	//声明交换机
    @Bean
    public DirectExchange confirmExchange() {
        return ExchangeBuilder
                .directExchange(CONFIRM_EXCHANGE_NAME)
                .durable(true)
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME) //备份交换机参数:指定备份交换机
                .build();
    }

    //声明队列
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder
                .durable(CONFIRM_QUEUE_NAME)
                .build();
    }

    //绑定
    @Bean
    public Binding queueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                               @Qualifier("confirmExchange") DirectExchange confirmExchange)
    {
        return BindingBuilder
                .bind(confirmQueue)
                .to(confirmExchange)
                .with(CONFIRM_ROUTING_KEY);
    }

	//备份交换机
    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //备份队列
    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    //警报队列
    @Bean
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    //绑定备份队列与备份交换机
    @Bean
    public Binding backQueueBindingBackExchange(@Qualifier("backupQueue") Queue backupQueue,
                                                @Qualifier("backupExchange") FanoutExchange backupExchange)
    {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    //绑定警报队列与备份交换机
    @Bean
    public Binding warningQueueBindingBackExchange(@Qualifier("warningQueue") Queue warningQueue,
                                                   @Qualifier("backupExchange") FanoutExchange 
                                                   backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }

生产者

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, "1 " + message, correlationData);
        log.info("发送消息内容:{}", "1 " + message);

        //模拟不可路由发送的消息
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY + "2", "2 " + message, correlationData2);
        log.info("发送消息内容:{}", "2 " + message);
    }

警报消费者

/**
 * 警报消费者
 */
@Slf4j
@Component
public class WarningConsumer {

    //接收警报信息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}", msg);
    }
}
2022-12-02 23:35:43.526  INFO 76792 --- [nio-8080-exec-1] c.w.r.controller.ProducerController      : 发送消息内容:1 啊啊啊
2022-12-02 23:35:43.528  INFO 76792 --- [nectionFactory1] com.wang.rabbitmq.config.MyCallBack      : 交换机成功收到了ID为:1的消息
2022-12-02 23:35:43.529  INFO 76792 --- [nio-8080-exec-1] c.w.r.controller.ProducerController      : 发送消息内容:2 啊啊啊
2022-12-02 23:35:43.533 ERROR 76792 --- [ntContainer#3-1] c.w.rabbitmq.consumer.WarningConsumer    : 报警发现不可路由消息:2 啊啊啊
2022-12-02 23:35:43.533  INFO 76792 --- [ntContainer#0-1] c.w.rabbitmq.consumer.ConfirmConsumer    : 接收到的队列confirm.queue的消息:1 啊啊啊
2022-12-02 23:35:43.555  INFO 76792 --- [nectionFactory1] com.wang.rabbitmq.config.MyCallBack      : 交换机成功收到了ID为:2的消息




:当回退消息和备份交换机在配置文件application.properties同时存在时,

优先使用备份交换机



版权声明:本文为QQwli原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。