RabbitMQ保证消息的可靠性

  • Post author:
  • Post category:其他




RabbitMQ



一、三种可能出现消息丢失的情况及解决办法

1、生产者弄丢了消息

生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败

2、MQ自身弄丢了消息

未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失

开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小

3、消费者弄丢了消息

消费者刚接收到消息还没处理完成,结果消费者挂掉了


解决办法:



1、生产者弄丢消息

生产者在发送数据之前开启RabbitMQ的事务,采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能

1.1、开启confirm模式,实现ConfirmCallback回调接口重写confirm方法

事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制

1.2、交换机无法将消息进行路由时,会将该消息返回给生产者 实现ReturnCallback回调接口重写returnedMessage方法

spring:
  rabbitmq:
      publisher-confirm-type: correlated # 开启消息发送确认机制
      publisher-returns: true # 开启消息无法从交换机到达队列时 交换机退回消息给生产者
/**
 * 生产者确认回调对象
 * spring.rabbitmq.publisher-confirm-type=correlated 高版本
 * 每个RabbitTemplate只支持一个ConfirmCallback
 * @author Cc
 **/
@Slf4j
@Component
public class ProducerAckConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
   

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;

    @PostConstruct
    private void init() {
   
        //设置生产者确认回调对象 this表示调用当前重写的confirm方法
        rabbitTemplate.setConfirmCallback(this);


        /* 
         *true:
         *交换机无法将消息进行路由时,会将该消息返回给生产者
         *false:
         *如果发现消息无法进行路由,则直接丢弃
         */
        rabbitTemplate.setMandatory(true);
        // 设置回退消息交给谁处理
        rabbitTemplate.setReturnCallback(this);
    }


    /**
     * 发送ack确认回调
     * @param correlationData 这里获取唯一id
     * @param ack             是否确认收到(true已确认收到,false未确认收到)
     * @param cause           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   
        // 有些没有设置发送应答ack的,不需要走后续的逻辑 CorrelationData对象需要生产者传递
        if (correlationData == null) {
   
            return;
        }

        // 确认方法
        log.info("是否确认发送成功ack = {}  失败原因cause={}", ack, cause);
        // 如果为true,代表mq已成功接收消息
        if (ack) {
   
            //  如果发送交换机成功,但是没有匹配路由到指定的队列, 这个时候ack返回是true(这是一个坑)
            //  解决办法实现ReturnCallback 函数式接口 当消息到达了交换机但没有匹配路由到指定的队列时触发回调接口
            log.info("消息确认发送成功:correlationDataId = {}", correlationData.getId());

        } else {
   
            // 如果为false,代表mq没有接收到消息(消息生产失败)
            // 业务处理(采用定时器进行轮询发送) 不能调用rabbitTemplate发送,会导致线程死锁
            // 解决办法 从Redis中起出对象缓存. 让定时任务轮询发送
            // 调用定时任务服务轮询发送
            // 将错误记录进缓存 后续起出通过Aop记录进日志
            /* Map errorMap = new HashMap();
            errorMap.put("status", "-2");// ack失败
            errorMap.put("errorMsg", cause);
            errorMap.put("correlationData", correlationData);
            redisTemplate.boundHashOps("correlationData").put(correlationData.getId(), errorMap);
           */
            log.error("消息确认发送失败:correlationDataId = {}", correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(Message message



版权声明:本文为Tt5802原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。