springboot2.3.1整合RabbitMQ多种工作模式 发送确认,手动应答

  • Post author:
  • Post category:其他



springboot2.3.1整合RabbitMQ


本文中 基础使用了RabbitMQ的几种模式

包括:



基础使用(一个生产者,一个消费者)



WORK 模式 (一个生产者,多个消费者)



FANOUT 模式 队列绑定到交换机



DIRECT 模式 队列绑定到交换机 ,再绑定路由键



TOPIC 模式 队列绑定到交换机 再定义匹配路由键



发送方确认,消费方手动应答



一、RabbitMQ安装

我这里选择的是docker 安装 ,非常快捷



1.拉取对应镜像
docker pull rabbitmq:management


2.启动镜像

1.使用默认guest账户 /密码登录

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management



2.自定义用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=用户名 -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 rabbitmq:management

image-20200713225725427



二、与Springboot 整合



依赖以及配置
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

配置:application.yml

对,你没有看错 ,我这里暂时只配了 host 主机 因为我,rabbitmq 安装不在本地 …

为什么我不用配置其他信息?

答:springboot自定装配 rabbitmq有其默认属性,例如host 为 127.0.0.1 账户密码为guest 端口为5672等等,如果是本地demo 可以实现零配置简单运行使用rabbitmq,但是生产环境,还是老老实实改rabbitmq服务器配置(端口,账户密码等 为了安全)

spring:
  rabbitmq:
    host: xxxx
  main:
    allow-bean-definition-overriding: true


简单模式

我这里演示一个生产者,一个消费者

注意 Queue 在 org.springframework.amqp.core 包下即可

队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:20
 * @desc 配置一个队列 
 */
@Configuration
public class EasyRabbitConfig {
    @Bean
    public Queue easyQueue() {
        return new Queue("rabbit_easy_queue");
    }
}

生产者

注入RabbitTemplate 即可快乐的使用我们的小白兔了(RabbitMQ)

import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:21
 */
@Service
public class EasyProviderServer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendEasyMessage() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("rabbit_easy_queue",new Vehicle(i,i+"车车"));
        }
    }

}

消费者

import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:24
 */
@Component
public class EasyConsumer {
    @RabbitListener(queues = "rabbit_easy_queue")
    @RabbitHandler
    public void process(Vehicle vehicle) {
        System.out.println("简单消费者接收到车车消息: " + vehicle);
    }
}

测试

测试时,咱进入我们的rabbirmqWEB 管理页面查看一下

image-20200715215404716

不知道小伙伴们注意到没有,我们的消息是被无序消费了

image-20200715215211811

通过第一个例子,我们来说明一下基本问题

测试完成后,队列依然存在 ,消息被无序消费了



发现的问题:消息无序 ,队列未被删除

原因:

回到最开始,定义队列开始

image-20200715215544592

点进去看 ,原来其默认配置了队列持久化

image-20200715215918806

消息消费问题

我们前边 EasyProviderServer 发送消息是采用的 rabbitTemplate.convertAndSend ,此就是无序发送

怎么有序? 使用 convertSendAndReceive

image-20200715220206317

测试

image-20200715220407098



Work模式

我这里演示,一个生产者 多个消费者

队列

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:30
 * @desc 工作模式 一个生产者 多个消费者
 */
@Configuration
public class WorkRabbitConfig {
    @Bean
    public Queue easyQueue() {
        return new Queue("rabbit_work_queue");
    }
}

生产者发送消息

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:21
 */
@Service
public class WorkProviderServer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendWorkMessage() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("rabbit_work_queue",new Vehicle(i,i+"work车车"));
        }
    }

}

消费者接收消息并消费

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:24
 */
@Component
public class WorkConsumer {
    @RabbitListener(queues = "rabbit_work_queue")
    public void work1(Vehicle vehicle) {
        System.out.println("消费者1--work--接收到车车消息: " + vehicle);
    }
    @RabbitListener(queues = "rabbit_work_queue")
    public void work2(Vehicle vehicle) {
        System.out.println("消费者2--work--接收到车车消息: " + vehicle);
    }
}

测试

一个生产者 多个消费者 生产者生产的消息被平分到消费者 例如 十条消息 有两个消费者 则每个消费者会消费五次

image-20200715221302400



Fanout 模式

又叫无路由键交换机模式

交换机基础使用 ,队列绑定到交换机,当发送消息到交换机时,绑定到该交换机的队列都会监听到

配置 队列 交换机 绑定关系等

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 21:58
 * @desc 发布订阅模式 配置两个队列一个交换机
 */
@Configuration
public class FanoutExchangeConfig {
    /**
     * 队列一
     * @return
     */
    @Bean
    public Queue FanoutQueueOne() {
        return new Queue("rabbit_fanout_queue_one");
    }
    /**
     * 队列二
     * @return
     */
    @Bean
    public Queue FanoutQueueTwo() {
        return new Queue("rabbit_fanout_queue_two");
    }
    /**
     * 交换机 声明为FanoutExchange类型
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange");
    }

    /**
     * 绑定队列一到交换机
     * @param FanoutQueueOne 上方定义的队列一方法名  根据此方法名参数 器会自动注入对应bean
     * @param fanoutExchange 上方定义的交换机方法名
     * @return
     */
    @Bean
    public Binding bindingFanoutExchangeA(Queue FanoutQueueOne, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(FanoutQueueOne).to(fanoutExchange);
    }

    /**
     * 绑定队列二到交换机
     * @param FanoutQueueTwo  上方定义的队列二方法名  根据此方法名参数 器会自动注入对应bean   当
     *                        然也可以省略参数 直接在bind中指定队列构建方法名 例如 FanoutQueueTwo()
     *
     * @param fanoutExchange 上方定义的交换机方法名
     * @return
     */
    @Bean
    public Binding bindingFanoutExchangeB(Queue FanoutQueueTwo, FanoutExchange  fanoutExchange) {
        return BindingBuilder.bind(FanoutQueueTwo).to(fanoutExchange);
    }
}

生产者生产消息

@Service
public class FanoutExchangeProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendFanoutExchangeMessage() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertSendAndReceive("fanout_exchange","",new Vehicle(i,i+"发布订阅车车"));
        }
    }
}

消费者

import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 22:14
 * @desc 发布订阅消费者   一个对了绑定两个消费者
 */
@Component
public class FantoutExchangeConsumer {
    @RabbitListener(queues = "rabbit_fanout_queue_one")
    public void consumerOne(Vehicle vehicle) {
        System.out.println("rabbit_fanout_queue_one队列 消费者1:收到消息---" + vehicle);
    }
    @RabbitListener(queues = "rabbit_fanout_queue_one")
    public void consumerOne2(Vehicle vehicle) {
        System.out.println("rabbit_fanout_queue_one队列 消费者2:收到消息---" + vehicle);
    }
    //-------------一个队列绑定两个消费者 --------------------------------
    @RabbitListener(queues = "rabbit_fanout_queue_two")
    public void consumerTwo(Vehicle vehicle) {
        System.out.println("rabbit_fanout_queue_two队列 消费者1:收到消息---" + vehicle);
    }
    @RabbitListener(queues = "rabbit_fanout_queue_two")
    public void consumerTwo2(Vehicle vehicle) {
        System.out.println("rabbit_fanout_queue_two队列 消费者2:收到消息---" + vehicle);
    }
}

测试:

发布订阅模式 订阅到交换机的队列 都会获取发布的消息,例如我生产者生产消息循环20次 则 队列一 队列二均会被消费10次

注意,我这里使用了convertSendAndReceive ,所以消息是有序的

image-20200715222028261



Direct模式

又叫直连路由键交换机模式,其会直连指定一个路由键与队列 与交换机进行绑定

配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 22:32
 * @desc 路由模式 在发布订阅基础上 添加路由键 吧消息交给符合指定路由键的队列   我这里定义两个队列绑到一个交换机上 对应两个不同的路由键
 */
@Configuration
public class DirectExchangeConfig {
    /**
     * 队列一
     * @return
     */
    @Bean
    public Queue directQueueOne() {
        return new Queue("rabbit_direct_queue_one");
    }

    /**
     * 队列二
     * @return
     */
    @Bean
    public Queue directQueueTwo() {
        return new Queue("rabbit_direct_queue_two");
    }

    /**
     * 定义交换机 direct类型
     * @return
     */
    @Bean
    public DirectExchange myDirectExchange() {
        return new DirectExchange("direct_exchange");
    }

    /**
     * 队列 绑定到交换机 再指定一个路由键
     * directQueueOne() 会找到上方定义的队列bean
     * @return
     */
    @Bean
    public Binding DirectExchangeOne() {
        return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with("lei_routingKey_one");
    }
    /**
     * 队列 绑定到交换机 再指定一个路由键
     * @return
     */
    @Bean
    public Binding DirectExchangeTwo() {
        return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with("lei_routingKey_two");
    }
}

生产者生产消息

@Service
public class DirectExchangeProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 三个参数 交换机 路由键 消息
     */
    public void sendDirectMessageOne() {
        for (int i = 0; i < 5; i++) {
            if (i % 2 == 0) {
                rabbitTemplate.convertAndSend("direct_exchange",
                        "lei_routingKey_one",
                        new Vehicle(i, i + "路由键lei_routingKey_one车车"));
            } else {
                rabbitTemplate.convertAndSend("direct_exchange",
                        "lei_routingKey_two",
                        new Vehicle(i, i + "路由键lei_routingKey_two车车"));
            }

        }
    }
}

消费者

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/14 22:49
 * @desc 路由模式消费者 我这里rabbit_direct_queue_one 使用两个消费者接受消息
 * rabbit_direct_queue_two 使用一个消费者接收消息
 */
@Component
public class DirectExchangeCousumer {
    @RabbitListener(queues = "rabbit_direct_queue_one")
    public void consumerOne(Vehicle vehicle) {
        System.out.println("rabbit_direct_queue_one队列 消费者1:收到消息---" + vehicle);
    }
    @RabbitListener(queues = "rabbit_direct_queue_one")
    public void consumerTwo(Vehicle vehicle) {
        System.out.println("rabbit_direct_queue_one队列 消费者2:收到消息---" + vehicle);
    }
    @RabbitListener(queues = "rabbit_direct_queue_two")
    public void consumerDirect(Vehicle vehicle) {
        System.out.println("rabbit_direct_queue_two队列 :收到消息---" + vehicle);
    }
}

测试:

image-20200715222803528

此效果不明显,我们还可以单独测试两个路由键,使其逻辑清晰点

image-20200715222901508

测试结果贴图

image-20200715223005907

image-20200715223128242



Topic模式

主题模式,其也是交换机模式的一种,与直连路由键交换机的区别在于其可以对交换机做层级匹配,直说可能有点抽象,咱们结合代码

配置

就设定了两个队列 一个 topicExchange交换机 并手动指定了两个路由键

topic.#


topic.*

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author PengLei
 * @date 2020/7/15 0015 9:51
 * @desc topic 主题模式 两个队列 一个topicExchange交换机
 */
@Configuration
public class TopicRabbitConfig {
    /**
     * 队列定义
     * @return
     */
    @Bean
    public Queue topicQueueOne() {
        return new Queue("rabbit_topic_queue_1");
    }
    /**
     * 队列定义
     * @return
     */
    @Bean
    public Queue topicQueueOTwo() {
        return new Queue("rabbit_topic_queue_2");
    }

    /**
     * 定义 TopicExchange 类型交换机
     * @return
     */
    @Bean
    public TopicExchange exchangeTopic() {
        return new TopicExchange("topic_exchange");
    }

    /**
     * 队列一绑定到交换机 且设置路由键为 topic.#
     * @return
     */
    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");
    }
    /**
     * 队列一绑定到交换机 且设置路由键为 topic.*
     * @return
     */
    @Bean
    public Binding bindingTopic2() {
        return BindingBuilder.bind(topicQueueOTwo()).to(exchangeTopic()).with("topic.*");
    }
}

生产者生产消息

@Service
public class TopicRabbitProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTopMessage() {
        for (int i = 0; i < 10; i++) {
            if (i%2==0) {
                rabbitTemplate.convertSendAndReceive("topic_exchange","topic.lei",new Vehicle(i,i+"一个词路由键车车"));
            }else {
                rabbitTemplate.convertSendAndReceive("topic_exchange","topic.lei.xxl",new Vehicle(i,i+"多个词路由键车车"));
            }
        }
    }
}

消费者

@Component
public class TopRabbitConsumer {
    @RabbitListener(queues = "rabbit_topic_queue_1")
    public void listenOne(Vehicle vehicle) {
        System.out.println("监听到队列一消息" + vehicle);
    }
    @RabbitListener(queues = "rabbit_topic_queue_2")
    public void listenOTwo(Vehicle vehicle) {
        System.out.println("监听到队列二消息" + vehicle);
    }
}

测试:

队列一绑定的是 topic.# ,其结果 发送到 路由键 topic.lei.xxl 与 topic.lei 的消息都被接受了  一个词多个词消息都被队列一监听到消费者消费了
队列二 绑定路由键是 topic.* 其结果只有%2==0的消息被接受了,即 topic.lei  仅仅监听到了一个词路由键的消息

* 仅仅会匹配路由键的一个词  # 则可以匹配路由键的多个词

image-20200715225109352

以上为RabbitMQ 中几种模式的使用

但是,我们再开发中或实际生产中,或多或少会出现异常,不会仅仅像我们demo所演示的那般简单以及流畅,那么当出现问题时,例如消息发送成功与否未知,消息未被消息等等

下边,咱们进行简单地操作一下,保证一个生产者消息成功发送,以及消费者消息确认机制



消息发送确认机制,消息消费确认机制

改造咱们的配置 application.yml

spring:
  rabbitmq:
    #我这里仅写了ip 其余端口账号密码由于是演示 采用默认即可,不必要写
    host: xxxx
    # 开启消息确认机制 confirm 异步
    publisher-confirm-type: correlated
    listener:
      direct:
        # 消息开启手动确认
        acknowledge-mode: manual
    publisher-returns: true
  main:
    allow-bean-definition-overriding: true

咱们额外定义一个交换机 以及队列消息 试用下 fanout模式

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/15 20:15
 * @desc 测试confirm 机制,专门创建了一个队列
 */
@Configuration
public class ConfirmRabbitConfig {
    @Bean
    public Queue confirmQueue() {
        return new Queue("rabbit_confirm_queue");
    }
    @Bean
    public FanoutExchange confirmExchange() {
        return new FanoutExchange("confirm_fanout_exchange");
    }
    @Bean
    public Binding confirmFanoutExchangeBing() {
       return BindingBuilder.bind(confirmQueue()).to(confirmExchange());
    }
}

生产者生产消息,配置 confirm消息发送确认机制

package com.leilei.confirm;

import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author lei
 * @version 1.0
 * @date 2020/7/15 20:16
 * @desc
 */
@Service
public class ConfirmServer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 配置 confirm 机制
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         *
         * @param correlationData 消息相关的数据,一般用于获取 唯一标识 id
         * @param b 是否发送成功
         * @param error 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String error) {
            if (b) {
                System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId());
            } else {
                System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error);
            }
        }
    };
    /**
     * 发送消息 参数有:交换机 ,空路由键,消息,并设置一个唯一消息ID
     */
    public void sendConfirm() {
            rabbitTemplate.convertAndSend("confirm_fanout_exchange",
                    "",
                    new Vehicle(1,"confirm功能的车车"),
                    new CorrelationData("" + System.currentTimeMillis()));
                    //使用咱们上方配置的发送回调方法
            rabbitTemplate.setConfirmCallback(confirmCallback);
    }
}

消费者

设置消费者手动应答 先模拟一个正常接收场景

byte数组转对象工具

    public static <T> Optional<T> bytesToObject(byte[] bytes) {
        T t = null;
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        ObjectInputStream sIn;
        try {
            sIn = new ObjectInputStream(in);
            t = (T) sIn.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Optional.ofNullable(t);

    }
@Component
public class ConfirmConsumer {
    @RabbitListener(queues = "rabbit_confirm_queue")
    public void aa(Message message, Channel channel) throws IOException, InterruptedException {
        try {
            System.out.println("正常收到消息:" + bytesToObject(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 两个布尔值  第二个设为 false 则丢弃该消息 设为true 则返回给队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.out.println("消费失败 我此次将返回给队列");
        }
    }
}

测试:

image-20200715230648927
image-20200715230709021

此为消息成功发送,且消息被成功消费的场景

咱们再模拟一个消费失败的场景

代码修改

image-20200715230954917

测试:

image-20200715232157646

查看RabbitMQ web 管理界面

发现对应队列中 一条消息未被消息,无未应答消息

image-20200715232226798

​ 我们再删除掉异常代码,再次使用生产者发送消息,查看是否会消费两次(之前有一条消息未被消息,正常来说,该消息没有被丢弃则下次会继续投递)

image-20200715232439821

image-20200715232538087

那么,可能有时候,我们消息消费失败了,不需要保存到队列,下次从新投递新的消息,这该怎么处理呢?例如我们发送短信,可能短信没发过来,于是再此尝试 那么如果成功了,也该只啊一条短信过来

我们再消费者中设置消息应答 消息丢弃即可

image-20200715232928794

测试:

image-20200715233006189

查看web管理页面 发现也没有未被消息消息,则说明消息真的被丢弃了

image-20200715233032084

总结

### springboot 整合rabbitmq
整合多种模式 

common 为多个模式公共所需 我这里为 一个对象实体


easy 包下 为简单模式 一个生产者 一个消费者

work 包下 为工作模式 一个生产者 多个消费者  多个消费者会轮流获取到队列消息 例如 两个消费者 生产者发送十条消息 则 每个消费者会消费五次

confirm 包下 为rabbitmq 消费发送者确认模式 配合消费者端的手动应答,确保消息被成功发送以及消费

directechange 包下 路由模式  队列绑定到direct交换机再指定路由键 ,生产者发送消息时指定交换机路由键 ,则会被对应队列监听到

fanoutexchange包下 发布订阅模式 队列绑定到fanout交换机 未指定路由键名, 生产者发送消息时指定交换机 路由键指定为空或指定为 "" 则会被所有订阅到交换机的队列监听到

topic 包下 为主题模式  队列绑定到tipic模式交换机 在指定路由键 例如(top.#)或(top.*)
    #  * 区别
    * 仅仅会匹配路由键的一个词 如果生产者发送消息到路由键 例如 leilei.one / leilei.two 则会被对应绑定的队列监听到
    # 则可以匹配路由键的多个词 如果生产者发送消息到路由键 例如 leilei.one / leilei.one.xxl / leilei.one.xxl.eq  只要是以leilei 路由键为前缀的,无论多少个次都会被监听到
    
queue 队列默认为持久化状态
    
发送消息 convertSendAndReceive   convertAndSend 区别
  使用 convertAndSend 方法时的结果:输出时没有顺序,不需要等待,直接运行
  使用 convertSendAndReceive 方法时的结果:输出有顺序 只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有等待间隔时间
  
确保消息成功发送 confirm 模式 如果失败则可以按照自己逻辑处理保存到数据库失败发送表 可后续做补偿
确保消息成功消费 ack 手动应答  失败时按业务 选择从新投递或者丢弃
  

rabbitmq初步的学习就到这里了,随着学习的不断深入,再继续更新

附上项目源码:

springboot2.3.1整合RabbitMQ



版权声明:本文为leilei1366615原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。