RabbitMq死信和延时队列(基于死信 and 基于插件)

  • Post author:
  • Post category:其他




死信



什么是死信队列

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效



死信的来源

1:消息 TTL 过期
2:队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3:消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false   



代码演示



消息过期放入死信



消费者A

package com.leava.cloud.deadMsg;

import com.google.common.collect.Maps;
import com.leava.cloud.util.MqConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


import java.util.Map;

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 * 消费者 死信队列
 */
public class ConsumerO1 {

  //声明普通队列
  public static final String ORDINARY_QUEUE = "ordinary_queue";
  //声明普通交换机
  public static final String ORDINARY_CHANGE = "ordinary_change";
  //声明死信队列
  public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
  //声明死信交换机
  public static final String DEAD_LETTER_CHANGE = "dead_letter_change";
  //声明普通routingKey
  public static final String ORDINARY_ROUTING_KEY = "ordinary_routing_key";
  //声明死信routingKey
  public static final String DEAD_ROUTING_KEY = "dead_routing_key";

  public static void main(String[] args) throws Exception {
    Channel channel = MqConnectionUtil.getChannel();
    /**
     * 创建普通交换机 类型 为 direct
     */
    channel.exchangeDeclare(ORDINARY_CHANGE, BuiltinExchangeType.DIRECT);

    /**
     * 创建死信交换机 类型 为 direct
     */
    channel.exchangeDeclare(DEAD_LETTER_CHANGE, BuiltinExchangeType.DIRECT);


    /**
     * 创建普通队列 指定 死信交换机 以及死信 routingkey
     */
    Map<String, Object> arguments = Maps.newHashMap();
    arguments.put("x-dead-letter-exchange", DEAD_LETTER_CHANGE);
    arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
    channel.queueDeclare(ORDINARY_QUEUE, true, false, false, arguments);


    /**
     * 创建死信队列
     */
    channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null);

    System.out.println("等待接收消息......");

    /**
     * 绑定普通队列和普通交换机
     */
    channel.queueBind(ORDINARY_QUEUE, ORDINARY_CHANGE, ORDINARY_ROUTING_KEY);

    /**
     * 绑定死信队列和死信交换机
     */
    channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_CHANGE, DEAD_ROUTING_KEY);

    /**
     * 接收消息 不自动应答
     */
    DeliverCallback deliverCallback = (consumerTag, message) -> {
      System.out.println("接收到消息>>>" + new String(message.getBody()));
      channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    };
    channel.basicQos(1); //不公平分发
    channel.basicConsume(ORDINARY_QUEUE, false, deliverCallback, (consumerTag -> {
      System.out.println("接收消息失败>>>" + consumerTag);
    }));
  }
}



消费者B

package com.leava.cloud.deadMsg;

import com.leava.cloud.util.MqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 * 消费者 死信队列
 */
public class ConsumerO2 {
  //声明死信队列
  public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";

  public static void main(String[] args) throws Exception {
    Channel channel = MqConnectionUtil.getChannel();

    System.out.println("死信队列等待接收消息......");

    /**
     * 接收消息 不自动应答
     */
    DeliverCallback deliverCallback = (consumerTag, message) -> {
      System.out.println("接收到消息>>>" + new String(message.getBody()));
      channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    };
    channel.basicQos(1); //不公平分发
    channel.basicConsume(DEAD_LETTER_QUEUE, false, deliverCallback, (consumerTag -> {
      System.out.println("接收消息失败>>>" + consumerTag);
    }));
  }
}



生产者

package com.leava.cloud.deadMsg;

import com.leava.cloud.util.MqConnectionUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/18 0018
 */
public class Producer {
  //声明普通交换机
  public static final String ORDINARY_CHANGE = "ordinary_change";
  //声明普通routingKey
  public static final String ORDINARY_ROUTING_KEY = "ordinary_routing_key";

  public static void main(String[] args) throws Exception {
    Channel channel = MqConnectionUtil.getChannel();
    /**
     * 死信队列 设置TTL 过期时间 10S
     */
    AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().expiration("10000").build();
    for (int i = 0; i < 11; i++) {
      String message = "info:"+i;
      channel.basicPublish(ORDINARY_CHANGE,ORDINARY_ROUTING_KEY,build,message.getBytes());
    }
  }
}



演示

测试流程: 先启动 消费者A 再启动消费者B 接着停掉消费者A 启动生产者

在这里插入图片描述
消息先进入普通队列中接着等待10S超过了设置的时候,那么就会进入死信队列

在这里插入图片描述

由死信队列进行消费



队列达到最大长度放入死信

在这里插入图片描述

在原来的消费者A中加入这一行代码即可



演示

删除掉原来的交换机和队列重新启动消费者A再启动消费B 启动生产者

在这里插入图片描述

在这里插入图片描述

可以看到 首次发消息是通过普通队列 之后就会截取每超过6条就会截取到死信队列中



消息被拒绝放入死信

在这里插入图片描述

修改消费者A的代码 拒绝消息为info:5的 将其放入死信



演示

注释掉最大长度放入死信的影响 并且删除掉原来的队列

启动消费者A 和 B 启动生产者

在这里插入图片描述

在这里插入图片描述

可以看到达到了我们的预期



延时队列



基于死信的延时队列



新建SpringBoot项目 在pom文件中加入依赖

我的是微服务项目, 读者可以去掉nacos客户端和feign客户端
   <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--添加nacos客户端-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <!--添加Lombok插件依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>



死信队列配置

package com.leava.cloud.config;

import com.google.common.collect.Maps;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 * 基于死信的延时队列
 */
@Configuration
public class DeadTTlConfig {
  //声明普通交换机
  public static final String ORDINARY_CHANGE = "ordinary_change";

  //声明死信交换机
  public static final String DEAD_LETTER_CHANGE = "dead_letter_change";


  //声明普通队列1
  public static final String ORDINARY_QUEUE = "ordinary_queue";
  //声明普通队列2
  public static final String ORDINARY_QUEUE_TWO = "ordinary_queue_two";

  //声明死信队列
  public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";

  //声明普通队列1和普通交换机的 routingKey
  public static final String ORDINARY_ROUTING_KEY = "ordinary_routing_key";

  //声明普通队列2和普通交换机的 routingKey
  public static final String ORDINARY_TWO_ROUTING_KEY = "ordinary_two_routing_key";

  //声明死信交换机和死信队列的routingKey
  public static final String DEAD_ROUTING_KEY = "dead_routing_key";

  /**
   * 创建普通交换机
   */
  @Bean("ordinaryChange")
  public DirectExchange ordinaryChange() {
    return new DirectExchange(ORDINARY_CHANGE);
  }

  /**
   * 创建死信交换机
   */
  @Bean("deadLetterChange")
  public DirectExchange deadLetterChange() {
    return new DirectExchange(DEAD_LETTER_CHANGE);
  }

  /**
   * 创建普通队列1
   */
  @Bean("ordinaryQueue")
  public Queue ordinaryQueue() {
    Map<String, Object> arguments = Maps.newHashMap();
    arguments.put("x-dead-letter-exchange", DEAD_LETTER_CHANGE);
    arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
    return QueueBuilder.durable(ORDINARY_QUEUE).withArguments(arguments).build();
  }


  /**
   * 创建普通队列2
   */
  @Bean("ordinaryQueueTwo")
  public Queue ordinaryQueueTwo() {
    Map<String, Object> arguments = Maps.newHashMap();
    arguments.put("x-dead-letter-exchange", DEAD_LETTER_CHANGE);
    arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
    return QueueBuilder.durable(ORDINARY_QUEUE_TWO).withArguments(arguments).build();
  }

  /**
   * 创建死信队列
   */
  @Bean("deadLetterQueue")
  public Queue deadLetterQueue() {
    return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
  }

  /**
   * 绑定普通队列1和普通交换机
   */
  @Bean
  public Binding ordinaryQueueBindingOrdinaryChange(
      @Qualifier("ordinaryChange") DirectExchange ordinaryChange,
      @Qualifier("ordinaryQueue") Queue ordinaryQueue) {
    return BindingBuilder.bind(ordinaryQueue).to(ordinaryChange).with(ORDINARY_ROUTING_KEY);
  }

  /**
   * 绑定普通队列2和普通交换机
   */
  @Bean
  public Binding ordinaryQueueTwoBindingOrdinaryChange(
      @Qualifier("ordinaryChange") DirectExchange ordinaryChange,
      @Qualifier("ordinaryQueueTwo") Queue ordinaryQueueTwo) {
    return BindingBuilder.bind(ordinaryQueueTwo).to(ordinaryChange).with(ORDINARY_TWO_ROUTING_KEY);
  }

  /**
   * 绑定死信队列和死信交换机
   */
  @Bean
  public Binding deadQueueTwoBindingOrdinaryChange(
      @Qualifier("deadLetterChange") DirectExchange deadLetterChange,
      @Qualifier("deadLetterQueue") Queue deadLetterQueue) {
    return BindingBuilder.bind(deadLetterQueue).to(deadLetterChange).with(DEAD_ROUTING_KEY);
  }
}

监听器

package com.leava.cloud.monitor;

import com.leava.cloud.config.DeadTTlConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 * 监听
 */
@Slf4j
@Component
public class DeadTTlQueueListen {


  @RabbitListener(queues = DeadTTlConfig.DEAD_LETTER_QUEUE)
  public void receiveDeadQueue(Message message, Channel channel){
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
  }
}

生产者

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 */
@Slf4j
@RestController
@RequestMapping("/dead/send")
public class DeadController {
  @Autowired
  private RabbitTemplate rabbitTemplate;

  @GetMapping("/message/{msg}/{ttlTime}")
  public void message(@PathVariable("msg") String msg,@PathVariable("ttlTime") String ttlTime){
    log.info("发送消息{},过期时间为:"+ttlTime);
    rabbitTemplate.convertAndSend(DeadTTlConfig.ORDINARY_CHANGE,DeadTTlConfig.ORDINARY_ROUTING_KEY,msg,  correlationData->{
       correlationData.getMessageProperties().setExpiration(ttlTime);
       return correlationData;
    });
  }
}



测试

首先发第一条消息 msg为 baby1 ttlTime为20000 代表20S过期 进入死信队列进行消费

接着发第二条消息 msg为baba2 ttlTime为10000 代表10S过期 进入死信队列进行消费

观察两条消息的运行前后顺序

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


发现一个问题,我们第二次发的信息的ttl是比第一次发的信息的过期时间要短的,应该是先消费第二条,再消费第一条,但是还是遵循这先进先出的原则。



基于插件的延时队列



安装延时队列插件

地址: https://www.rabbitmq.com/community-plugins.html  
下载rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

出现下面这个东西 证明安装成功

在这里插入图片描述



演示

package com.leava.cloud.config;
import com.google.common.collect.Maps;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 * 基于插件的延时队列
 */
@Configuration
public class DelayedConfig {
  //队列
  public static final String DELAYED_QUEUE_NAME = "delayed.queue.name";
  //交换机
  public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange.name";
  //routingKey
  public static final String DELAYED_ROUTING_KEY = "delayed.routingkey.name";

  /**
   * 自定义交换机
   */
  @Bean
  public CustomExchange customExchanges(){
    Map<String, Object> arguments  = Maps.newHashMap();
    arguments.put("x-delayed-type","direct");
    return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
  }
  /**
   * 创建队列
   */
  @Bean
  public Queue delayedQ(){
    return  QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
  }
  /**
   * 绑定
   */
  @Bean
  public Binding customBindingQ(@Qualifier("customExchanges") CustomExchange customExchanges,
                                @Qualifier("delayedQ") Queue delayedQ){
    return BindingBuilder.bind(delayedQ).to(customExchanges).with(DELAYED_ROUTING_KEY).noargs();
  }
}

package com.leava.cloud.monitor;

import com.leava.cloud.config.DelayedConfig;
import com.leava.cloud.config.DelayedQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;
/**
 * @Description
 * @Author mq
 * @Version V1.0.0
 * @Date 2021/6/19 0019
 * 基于插件的监听器
 */
@Slf4j
@Component
public class DelayedListen {

   @RabbitListener(queues= DelayedConfig.DELAYED_QUEUE_NAME)
   public void receiveDelayedQueue(Message message, Channel channel){
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
  }
}

  @GetMapping("/message_delayed/{msg}/{delayedTime}")
  public void messageDelayed(@PathVariable("msg") String msg,@PathVariable("delayedTime") Integer delayedTime){
    log.info("发送消息{},延时时长为:{}",msg,delayedTime);
    rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE_NAME,DelayedConfig.DELAYED_ROUTING_KEY,msg , correlationData->{
      correlationData.getMessageProperties().setDelay(delayedTime);
      return correlationData;
    });
  }



测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

可以看到解决了上面那个问题,不再是先进先出的规则



版权声明:本文为yanshenssss原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。