问题描述:
使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法????
解决方案:
如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。
并非是BUG,而是有原因的,建议利用if 去拦截这个异常,判断延迟队列交换机名称,然后break;
@Component
@Slf4j
public class RabbitConfirmCallBack {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//消息未送达队列触发回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法????
//如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。
//并非是BUG,而是有原因的,建议利用if 去拦截这个异常,判断延迟队列交换机名称,然后break;
if (StrUtil.equalsAny(exchange, RabbitProducerConfig.DELAYE_EXCHANGE_NAME)) {
return;
}
log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JacksonUtil.bean2Json(message), replyCode, replyText, exchange, routingKey);
MessageVO msg = JacksonUtil.json2Bean(new String(message.getBody()), MessageVO.class);
// 更新数据库 设置消息的状态为发送失败
});
//消息进入到Exchange触发回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// ack ---> true:消息投递成功
log.info("ack ---> {}", ack);
log.info("cause ---> {}", cause);
String msgId = correlationData.getId();
// 1. 如果消息投递不成功
if (!ack) {
log.error("消息投递失败,消息id ---> {}", msgId);
// 2. TODO 查询数据库消息冗余表投递状态为0的消息,定时重新投递
return;
}
// 3. 消息投递成功
log.info("消息成功投递了...");
// 4. TODO 修改数据库消息冗余表修改投递状态为1
});
}
版权声明:本文为zlfjavahome原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。