SpringBoot集成RabbitMQ实现消息重试机制,消息重试3次失败后写入死信队列,消息重试3次失败后入库

  • Post author:
  • Post category:其他


一、yml配置

spring:
  rabbitmq:
    username: admin
    password: admin
    host: localhost
    port: 5672
    virtual-host: /
    publisher-confirm: true #发布确认 开启confirms回调 Producer -> Exchange
    publisher-returns: true #发布返回 开启returnedMessage回调 Exchange -> Queue
    listener:
      type: simple
      simple:
        acknowledge-mode: manual #消费端收到消息后的确认方式 manual手动确认  none自动确认
        prefetch: 1 #消费者预取1条数据到内存
        default-requeue-rejected: false  #决定被拒绝的消息是否重新入队。默认值为true,需要手动basicNack时这些参数谅失效了
        retry:
          enabled: true  #开启消费者 程序异常情况下会进行重试
          max-attempts: 3 #重试次数
          initial-interval: 2000 #消费者重试间隔次数 2s

2、RabbitConfig

@Autowired
private CachingConnectionFactory connectionFactory;

/**
 * rabbitTemplate
 * @return
 */
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(converter());
    //消息是否成功发送到Exchange
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.info("消息成功发送至Exchange");
        } else {
            log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
        }
    });
    //触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
    rabbitTemplate.setMandatory(true);
    //失败回调 消息是否从Exchange路由到Queue (只有消息从Exchange路由到Queue失败才会回调这个方法)
    rabbitTemplate.setReturnsCallback(returned ->
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), returned.getMessage())
    );
    return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter converter() {
    return new Jackson2JsonMessageConverter();
}

/**
 * 普通队列交换机
 */
@Bean
public FanoutExchange testFanoutExchange() {
    return new FanoutExchange(RabbitConstants.TEST_FANOUT_EXCHANGE, true, false);
}
/**
 * 普通队列
 * 基于消息事务的处理方式,当失败消息进行重试,有时间间隔,当达到超时时,就发送到死信队列,等待人工处理
 * @return
 */
@Bean
public Queue testFanoutQueue() {
    //超时2s就放入死信队列
    return QueueBuilder.durable(RabbitConstants.TEST_FANOUT_QUEUE_A).deadLetterExchange(RabbitConstants.TEST_DDL_EXCHANGE).ttl(2000).build();
}
 /**
 * 普通队列绑定到交换机
 * @return
 */
@Bean
public Binding testFanoutBinding() {
    return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange());
}
/**
 * 死信交换机
 * @return
 */
@Bean
public FanoutExchange testFanoutDdlExchange() {
    return new FanoutExchange(RabbitConstants.TEST_DDL_EXCHANGE, true, false);
}
/**
 * 死信队列
 * @return
 */
@Bean
public Queue testFanoutDdlQueue() {
    return new Queue(RabbitConstants.TEST_DDL_QUEUE_A, true, false, false);
}
/**
 * 死信队列绑定至交换机
 * @return
 */
@Bean
public Binding testFanoutDdlBinding() {
    return BindingBuilder.bind(testFanoutDdlQueue()).to(testFanoutDdlExchange());
}

3、消息实体

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value="MsgLog对象", description="消息投递日志")
public class MsgLog implements Serializable {

    private static final long serialVersionUID = 1L;

    @ApiModelProperty(value = "消息唯一标识")
    private String msgId;

    @ApiModelProperty(value = "消息体, json格式化")
    private String msg;

    @ApiModelProperty(value = "交换机")
    private String exchange;

    @ApiModelProperty(value = "路由键")
    private String routingKey;

    @ApiModelProperty(value = "状态: 0投递中 1投递成功 2投递失败 3已消费")
    private Integer status;

    @ApiModelProperty(value = "重试次数")
    private Integer tryCount;

    @ApiModelProperty(value = "下一次重试时间")
    private Date nextTryTime;

    @ApiModelProperty(value = "创建时间")
    private Date createTime;

    @ApiModelProperty(value = "更新时间")
    private Date updateTime;

}

4、producer

/**
 * @author : lixuan
 * @date : 2021/04/07/10:55
 * @description: 生产者
 */
public interface ProducerService {
	/**
     * 发送消息
     *
     * @param msg          某些你需要的参数
     * @param exchangeName 队列名称
     * @param routingKey   路由key
     * @return
     */
    void sendMsg(Msg msg, String exchangeName, String routingKey);
}

/**
 * @author : lixuan
 * @date : 2021/04/07/10:57
 * @description: 消息生产者
 */
@Service
public class ProducerServiceImpl implements ProducerService {
 	@Autowired
    private RabbitTemplate rabbitTemplate;


    @Override
    public void sendMsg(Msg msg, String exchangeName, String routingKey) {
        if (StringUtils.isBlank(msg.getMsgId())) {
            String msgId = UUID.randomUUID().toString().replaceAll("-", "");
            msg.setMsgId(msgId);
        }
        CorrelationData correlationData = new CorrelationData(msg.getMsgId());
        //发送消息到rabbitMQ
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationData);

    }
    
}

5、consumer

/**
 * @author : lixuan
 * @date : 2021/04/07/11:23
 * @description: 消息接收者
 */
@Component
@Slf4j
public class MessageReceiver {

    @Autowired
    private MsgService msgService;

    @Autowired
    private MsgLogService msgLogService;
	
	@RabbitListener(queues = RabbitConstants.TEST_FANOUT_QUEUE_A)
    public void testMsgReceiver(Message message, Channel channel) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Msg msg = JSON.parseObject(message.getBody(), Msg.class);
        try {
            int a = 1 / 0;
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            //重试次数
            Integer retryCount;
            String mapKey = "retry-count";
            if (!headers.containsKey(mapKey)) {
                retryCount = 0;
            } else {
                retryCount = (Integer) headers.get(mapKey);
            }
            if (retryCount++ < RETRY) {
                log.info("已经重试 " + retryCount + " 次");
                headers.put("retry-count", retryCount);
                //当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部。
                //这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行
                //而比较理想的方式是,出现异常时,消息到达消息队列尾部,这样既保证消息不回丢失,又保证了正常业务的进行。
                //因此我们采取的解决方案是,将消息进行应答。
                //这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案
                //1.应答
                channel.basicAck(deliveryTag, false);
                //2.重新发送到MQ中
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("application/json").headers(headers).build();
                channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                        message.getMessageProperties().getReceivedRoutingKey(), basicProperties,
                        message.getBody());
            } else {

                log.info("现在重试次数为:" + retryCount);
                /**
                 * 重要的操作 存盘
                 * 手动ack
                 * channel.basicAck(deliveryTag,false);
                 * 通知人工处理
                 * log.error("重试三次异常,快来人工处理");
                 */
				//消息存盘
                MsgLog msgLog = new MsgLog();
                msgLog.setMsgId(msg.getMsgId());
                msgLog.setMsg(new String(message.getBody(),"utf-8"));
                msgLog.setExchange(message.getMessageProperties().getReceivedExchange());
                msgLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
                msgLog.setTryCount(retryCount);
                msgLog.setStatus(MsgLogStatusEnum.FAIL.getStatus());
                msgLogService.save(msgLog);

                /**
                 * 不重要的操作放入 死信队列
                 * 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到ttl超时时间,再转到死信,证明这个信息有问题需要人工干预
                 */
                //休眠2s 延迟写入队列,触发转入死信队列
                //Thread.sleep(2000);
                //channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    
	@RabbitListener(queues = RabbitConstants.TEST_DDL_QUEUE_A)
    public void deadTestReceiver(Message message, Channel channel) throws IOException {
        log.info("消息将放入死信队列", new String(message.getBody(), "UTF-8"));
        String str = new String(message.getBody());
        //转换为消息实体
        Msg msg = JSON.parseObject(str, Msg.class);
        log.info("收到的消息为{}", msg);
    }
}

6、写个controller 自己测试一下。

7、postman测试一下

在这里插入图片描述

8、查看控制台

在这里插入图片描述

9、查看数据库

在这里插入图片描述