RabbitMQ
一、三种可能出现消息丢失的情况及解决办法
1、生产者弄丢了消息
生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败
2、MQ自身弄丢了消息
未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失
开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小
3、消费者弄丢了消息
消费者刚接收到消息还没处理完成,结果消费者挂掉了
解决办法:
1、生产者弄丢消息
生产者在发送数据之前开启RabbitMQ的事务,采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能
1.1、开启confirm模式,实现ConfirmCallback回调接口重写confirm方法
事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制
1.2、交换机无法将消息进行路由时,会将该消息返回给生产者 实现ReturnCallback回调接口重写returnedMessage方法
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启消息发送确认机制
publisher-returns: true # 开启消息无法从交换机到达队列时 交换机退回消息给生产者
/**
* 生产者确认回调对象
* spring.rabbitmq.publisher-confirm-type=correlated 高版本
* 每个RabbitTemplate只支持一个ConfirmCallback
* @author Cc
**/
@Slf4j
@Component
public class ProducerAckConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
@PostConstruct
private void init() {
//设置生产者确认回调对象 this表示调用当前重写的confirm方法
rabbitTemplate.setConfirmCallback(this);
/*
*true:
*交换机无法将消息进行路由时,会将该消息返回给生产者
*false:
*如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true);
// 设置回退消息交给谁处理
rabbitTemplate.setReturnCallback(this);
}
/**
* 发送ack确认回调
* @param correlationData 这里获取唯一id
* @param ack 是否确认收到(true已确认收到,false未确认收到)
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 有些没有设置发送应答ack的,不需要走后续的逻辑 CorrelationData对象需要生产者传递
if (correlationData == null) {
return;
}
// 确认方法
log.info("是否确认发送成功ack = {} 失败原因cause={}", ack, cause);
// 如果为true,代表mq已成功接收消息
if (ack) {
// 如果发送交换机成功,但是没有匹配路由到指定的队列, 这个时候ack返回是true(这是一个坑)
// 解决办法实现ReturnCallback 函数式接口 当消息到达了交换机但没有匹配路由到指定的队列时触发回调接口
log.info("消息确认发送成功:correlationDataId = {}", correlationData.getId());
} else {
// 如果为false,代表mq没有接收到消息(消息生产失败)
// 业务处理(采用定时器进行轮询发送) 不能调用rabbitTemplate发送,会导致线程死锁
// 解决办法 从Redis中起出对象缓存. 让定时任务轮询发送
// 调用定时任务服务轮询发送
// 将错误记录进缓存 后续起出通过Aop记录进日志
/* Map errorMap = new HashMap();
errorMap.put("status", "-2");// ack失败
errorMap.put("errorMsg", cause);
errorMap.put("correlationData", correlationData);
redisTemplate.boundHashOps("correlationData").put(correlationData.getId(), errorMap);
*/
log.error("消息确认发送失败:correlationDataId = {}", correlationData.getId());
}
}
@Override
public void returnedMessage(Message message
版权声明:本文为Tt5802原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。