RabbitMQ:RabbitMQ 延迟队列插件强制调用 ReturnCallback 里 returnedMessage 方法

  • Post author:
  • Post category:其他




问题描述:

使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法????



解决方案:




如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。


并非是BUG,而是有原因的,建议利用if 去拦截这个异常,判断延迟队列交换机名称,然后break;

@Component
@Slf4j
public class RabbitConfirmCallBack {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //消息未送达队列触发回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法????
            //如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。
            //并非是BUG,而是有原因的,建议利用if 去拦截这个异常,判断延迟队列交换机名称,然后break;
            if (StrUtil.equalsAny(exchange, RabbitProducerConfig.DELAYE_EXCHANGE_NAME)) {
                return;
            }
            log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JacksonUtil.bean2Json(message), replyCode, replyText, exchange, routingKey);
            MessageVO msg = JacksonUtil.json2Bean(new String(message.getBody()), MessageVO.class);
            // 更新数据库 设置消息的状态为发送失败

        });

        //消息进入到Exchange触发回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            // ack ---> true:消息投递成功
            log.info("ack ---> {}", ack);
            log.info("cause ---> {}", cause);
            String msgId = correlationData.getId();

            // 1. 如果消息投递不成功
            if (!ack) {
                log.error("消息投递失败,消息id ---> {}", msgId);
                // 2. TODO 查询数据库消息冗余表投递状态为0的消息,定时重新投递
                return;
            }

            // 3. 消息投递成功
            log.info("消息成功投递了...");
            // 4. TODO 修改数据库消息冗余表修改投递状态为1
        });
    }



版权声明:本文为zlfjavahome原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。