使用DelayExchange插件实现RabbitMQ延迟队列

  • Post author:
  • Post category:其他




1、安装DelayExchange插件


1.1、下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

在这里插入图片描述

下载与自己安装的RabbitMQ版本对应的DelayExchange插件,如3.8.9版本的插件对应RabbitMQ的3.8.5以上版本。

在这里插入图片描述



1.2、上传插件


1.2.1 进入RabbitMQ插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.10/plugins

如果是docker安装的RabbitMQ,使用下面命令查看数据卷,然后进入插件目录

docker volume inspect mq-plugins

在这里插入图片描述



1.2.2 将下载的插件上传到该目录


1.2.3 然后启动插件

docker需要进入RabbitMQ容器内部

docker exec -it 容器名 bash

启动插件命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

如果上传的版本不对这里会报错,重新下载上传合适的插件版本即可

最后重启RabbitMQ或容器

此时进入RabbitMQ管理页面可以看到能选择该类型,代表安装成功

在这里插入图片描述



2、声明DelayExchange交换机
	//声明DelayExchange交换机,处理延迟消息
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "my.delay.queue", durable = "true"),
            exchange = @Exchange(name = "my.delay.direct",delayed = "true"),
            key = "delay"
    ))
    public void listenDelayQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception{
        System.err.println("接收到的延迟消息:" + msg);
        channel.basicAck(deliveryTag, false);//如果开启了手动确认机制,则需要手动ack
}


3、发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间

    /**
     * 发送延迟消息
     * @param msg 待处理消息
     * @param time 延迟时间ms
     */
    public void sendDelayQueue(String msg,Long time){
        // 创建消息
        Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay",time)
                .build();
        // 消息ID,需要封装到CorrelationData中
        CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
        // 发送消息
        rabbitTemplate.convertAndSend("my.delay.direct", "delay", message, correlationData);
        System.out.println("发送消息成功");
    }



版权声明:本文为qq_41810002原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。