如何保证RabbitMQ的高可用

  • Post author:
  • Post category:其他







提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录




前言

消息中间件在很多场景被广泛使用,伴随着众多的使用,在实际项目中会为我们的数据带来极大的不严谨,所以今天耗费我的LOL时间跟大家分享下我的理解,愿助君一臂之力,如果错误的地方,还望不吝指正

核心思路围绕:

1、发布者确认发送了消息

2、发送到mq的消息持久化

3、消费者确认消费了消息

4、没消费到消息咋办



一、发布者确认


发布者确认

网络可能会以不太明显的方式发生故障,并且检测某些故障

需要时间

。因此,将协议帧或一组帧(例如已发布的消息)写入其套接字的客户端不能假定消息已到达服务器并已成功处理。它可能在途中丢失,或者其交付可能会大大延迟。

使用标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务 – 使通道事务化,然后为每个消息或消息集发布,提交。在这种情况下,事务是不必要的重量级事务,并且会将吞吐量降低 250 倍。为了解决这个问题,引入了一种确认机制。它模仿协议中已经存在的消费者确认机制。

若要启用 confirms,客户端将发送 confirm.select 方法。根据是否设置了“无等待”,代理可能会使用 confirm.select-ok 进行响应。一旦在通道上使用 confirm.select 方法,就说它处于确认模式。事务通道不能进入确认模式,一旦通道进入确认模式,就不能将其设置为事务性通道。

一旦通道处于确认模式,代理和客户端都会对消息进行计数(计数从第一个 confirm.select 的 1 开始)。然后,代理通过在同一通道上发送 basic.ack 来确认消息,因为它在处理消息时会确认消息。传递标记字段包含已确认邮件的序列号。代理还可以在 basic.ack 中设置多个字段,以指示所有消息(包括具有序列号的消息)都已处理。


对已发布的负面确认

在异常情况下,当代理无法成功处理消息时,代理将发送 basic.nack,而不是 basic.ack。在此上下文中,basic.nack 的字段与 basic.ack 中的相应字段具有相同的含义,应忽略重新排队字段。通过确认一条或多条消息,代理表明它无法处理消息并拒绝对它们负责;此时,客户端可以选择重新发布消息。

将频道置于确认模式后,所有随后发布的消息将被确认或取消一次。不保证消息确认的时间。不会同时确认和确认任何消息。

basic.nack 只有在负责队列的 Erlang 进程中发生内部错误时才会传递。

1.Callback

确保消息发送到交换机

    @Test
    public void EDG(){
        String str = "一轮明月正在冉冉升起";
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()) {
                    System.out.println("当前" + correlationData.getId() + "消息发送成功");
                } else {
                    System.out.println("当前" + correlationData.getId() + "消息发送失败" + "失败原因" + result.getReason());
                }
            }
        }, new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                throw new RuntimeException(ex);
            }
        });
        rabbitTemplate.convertAndSend("vv-exchange","vv-routingkey",str,correlationData);
        System.out.println("消息发送成功");
    }

开始测试:

2.retrunCallback

交换机路由到队列,这里spring创建bean的时候留给了我们自定义的方法,其中有一个接口:ReturnCallback,

还记得之前spring创建bean的流程,实例化bean—>填充属性—->初始化

spring对内置bean属性填充使用Aware,而我们知道了自己的bean名称RabbitTemplate,那简单了直接去spring ioc容器中拿到这个bean,然后定制我想要的效果



二、mq的消息持久化

这里是spring比较贴心,已经帮我们默认消息的持久化


三、消费者确认消费了消息

spring又很贴心,他提供了几个选项给我们

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

四、没消费到消息咋办

消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

我们需要在yaml文件中配置失败重试机制

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 
          multiplier: 1 
          max-attempts: 3 
          stateless: true 

这里重试3次之后mq还是会删除掉,我们还是要考虑重试之后,该消息还没被消费咋办!

amqp为我们准备了消息回收器接口!一共三个感兴趣的话跟进去看下

package cn.vv.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 威威财神
 * @date 2022/4/24 周日
 */
@Configuration
public class TheSpareTireConfig {

    @Bean
    public Queue theSpareTireQueue(){
        return new Queue("theSpareTire.queue", true);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("theSpareTire.exchange",true,false);
    }

    public Binding binding(){
        return BindingBuilder.bind(theSpareTireQueue()).to(directExchange()).with("theSpareTire");
    }

    /**
     *  重试失败后就将失败消息投递到指定的交换机
     *
     * @return {@link RepublishMessageRecoverer }
     * @Author 威威财神
     * @Date 2022/4/24
     */
    public RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"theSpareTire.exchange","theSpareTire");
    }
}

这里我使用RepublishMessageRecoverer,重试失败后就将失败消息投递到指定的交换机



总结

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理



版权声明:本文为m0_60524086原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。