文章目录
springboot2.3.1整合RabbitMQ
本文中 基础使用了RabbitMQ的几种模式
包括:
基础使用(一个生产者,一个消费者)
WORK 模式 (一个生产者,多个消费者)
FANOUT 模式 队列绑定到交换机
DIRECT 模式 队列绑定到交换机 ,再绑定路由键
TOPIC 模式 队列绑定到交换机 再定义匹配路由键
发送方确认,消费方手动应答
一、RabbitMQ安装
我这里选择的是docker 安装 ,非常快捷
1.拉取对应镜像
docker pull rabbitmq:management
2.启动镜像
1.使用默认guest账户 /密码登录
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
2.自定义用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=用户名 -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 rabbitmq:management
二、与Springboot 整合
依赖以及配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
配置:application.yml
对,你没有看错 ,我这里暂时只配了 host 主机 因为我,rabbitmq 安装不在本地 …
为什么我不用配置其他信息?
答:springboot自定装配 rabbitmq有其默认属性,例如host 为 127.0.0.1 账户密码为guest 端口为5672等等,如果是本地demo 可以实现零配置简单运行使用rabbitmq,但是生产环境,还是老老实实改rabbitmq服务器配置(端口,账户密码等 为了安全)
spring:
rabbitmq:
host: xxxx
main:
allow-bean-definition-overriding: true
简单模式
我这里演示一个生产者,一个消费者
注意 Queue 在 org.springframework.amqp.core 包下即可
队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:20
* @desc 配置一个队列
*/
@Configuration
public class EasyRabbitConfig {
@Bean
public Queue easyQueue() {
return new Queue("rabbit_easy_queue");
}
}
生产者
注入RabbitTemplate 即可快乐的使用我们的小白兔了(RabbitMQ)
import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:21
*/
@Service
public class EasyProviderServer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendEasyMessage() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("rabbit_easy_queue",new Vehicle(i,i+"车车"));
}
}
}
消费者
import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:24
*/
@Component
public class EasyConsumer {
@RabbitListener(queues = "rabbit_easy_queue")
@RabbitHandler
public void process(Vehicle vehicle) {
System.out.println("简单消费者接收到车车消息: " + vehicle);
}
}
测试
测试时,咱进入我们的rabbirmqWEB 管理页面查看一下
不知道小伙伴们注意到没有,我们的消息是被无序消费了
通过第一个例子,我们来说明一下基本问题
测试完成后,队列依然存在 ,消息被无序消费了
发现的问题:消息无序 ,队列未被删除
原因:
回到最开始,定义队列开始
点进去看 ,原来其默认配置了队列持久化
消息消费问题
我们前边 EasyProviderServer 发送消息是采用的 rabbitTemplate.convertAndSend ,此就是无序发送
怎么有序? 使用 convertSendAndReceive
测试
Work模式
我这里演示,一个生产者 多个消费者
队列
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:30
* @desc 工作模式 一个生产者 多个消费者
*/
@Configuration
public class WorkRabbitConfig {
@Bean
public Queue easyQueue() {
return new Queue("rabbit_work_queue");
}
}
生产者发送消息
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:21
*/
@Service
public class WorkProviderServer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWorkMessage() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("rabbit_work_queue",new Vehicle(i,i+"work车车"));
}
}
}
消费者接收消息并消费
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:24
*/
@Component
public class WorkConsumer {
@RabbitListener(queues = "rabbit_work_queue")
public void work1(Vehicle vehicle) {
System.out.println("消费者1--work--接收到车车消息: " + vehicle);
}
@RabbitListener(queues = "rabbit_work_queue")
public void work2(Vehicle vehicle) {
System.out.println("消费者2--work--接收到车车消息: " + vehicle);
}
}
测试
一个生产者 多个消费者 生产者生产的消息被平分到消费者 例如 十条消息 有两个消费者 则每个消费者会消费五次
Fanout 模式
又叫无路由键交换机模式
交换机基础使用 ,队列绑定到交换机,当发送消息到交换机时,绑定到该交换机的队列都会监听到
配置 队列 交换机 绑定关系等
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lei
* @version 1.0
* @date 2020/7/14 21:58
* @desc 发布订阅模式 配置两个队列一个交换机
*/
@Configuration
public class FanoutExchangeConfig {
/**
* 队列一
* @return
*/
@Bean
public Queue FanoutQueueOne() {
return new Queue("rabbit_fanout_queue_one");
}
/**
* 队列二
* @return
*/
@Bean
public Queue FanoutQueueTwo() {
return new Queue("rabbit_fanout_queue_two");
}
/**
* 交换机 声明为FanoutExchange类型
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_exchange");
}
/**
* 绑定队列一到交换机
* @param FanoutQueueOne 上方定义的队列一方法名 根据此方法名参数 器会自动注入对应bean
* @param fanoutExchange 上方定义的交换机方法名
* @return
*/
@Bean
public Binding bindingFanoutExchangeA(Queue FanoutQueueOne, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(FanoutQueueOne).to(fanoutExchange);
}
/**
* 绑定队列二到交换机
* @param FanoutQueueTwo 上方定义的队列二方法名 根据此方法名参数 器会自动注入对应bean 当
* 然也可以省略参数 直接在bind中指定队列构建方法名 例如 FanoutQueueTwo()
*
* @param fanoutExchange 上方定义的交换机方法名
* @return
*/
@Bean
public Binding bindingFanoutExchangeB(Queue FanoutQueueTwo, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(FanoutQueueTwo).to(fanoutExchange);
}
}
生产者生产消息
@Service
public class FanoutExchangeProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendFanoutExchangeMessage() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertSendAndReceive("fanout_exchange","",new Vehicle(i,i+"发布订阅车车"));
}
}
}
消费者
import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lei
* @version 1.0
* @date 2020/7/14 22:14
* @desc 发布订阅消费者 一个对了绑定两个消费者
*/
@Component
public class FantoutExchangeConsumer {
@RabbitListener(queues = "rabbit_fanout_queue_one")
public void consumerOne(Vehicle vehicle) {
System.out.println("rabbit_fanout_queue_one队列 消费者1:收到消息---" + vehicle);
}
@RabbitListener(queues = "rabbit_fanout_queue_one")
public void consumerOne2(Vehicle vehicle) {
System.out.println("rabbit_fanout_queue_one队列 消费者2:收到消息---" + vehicle);
}
//-------------一个队列绑定两个消费者 --------------------------------
@RabbitListener(queues = "rabbit_fanout_queue_two")
public void consumerTwo(Vehicle vehicle) {
System.out.println("rabbit_fanout_queue_two队列 消费者1:收到消息---" + vehicle);
}
@RabbitListener(queues = "rabbit_fanout_queue_two")
public void consumerTwo2(Vehicle vehicle) {
System.out.println("rabbit_fanout_queue_two队列 消费者2:收到消息---" + vehicle);
}
}
测试:
发布订阅模式 订阅到交换机的队列 都会获取发布的消息,例如我生产者生产消息循环20次 则 队列一 队列二均会被消费10次
注意,我这里使用了convertSendAndReceive ,所以消息是有序的
Direct模式
又叫直连路由键交换机模式,其会直连指定一个路由键与队列 与交换机进行绑定
配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lei
* @version 1.0
* @date 2020/7/14 22:32
* @desc 路由模式 在发布订阅基础上 添加路由键 吧消息交给符合指定路由键的队列 我这里定义两个队列绑到一个交换机上 对应两个不同的路由键
*/
@Configuration
public class DirectExchangeConfig {
/**
* 队列一
* @return
*/
@Bean
public Queue directQueueOne() {
return new Queue("rabbit_direct_queue_one");
}
/**
* 队列二
* @return
*/
@Bean
public Queue directQueueTwo() {
return new Queue("rabbit_direct_queue_two");
}
/**
* 定义交换机 direct类型
* @return
*/
@Bean
public DirectExchange myDirectExchange() {
return new DirectExchange("direct_exchange");
}
/**
* 队列 绑定到交换机 再指定一个路由键
* directQueueOne() 会找到上方定义的队列bean
* @return
*/
@Bean
public Binding DirectExchangeOne() {
return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with("lei_routingKey_one");
}
/**
* 队列 绑定到交换机 再指定一个路由键
* @return
*/
@Bean
public Binding DirectExchangeTwo() {
return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with("lei_routingKey_two");
}
}
生产者生产消息
@Service
public class DirectExchangeProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 三个参数 交换机 路由键 消息
*/
public void sendDirectMessageOne() {
for (int i = 0; i < 5; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("direct_exchange",
"lei_routingKey_one",
new Vehicle(i, i + "路由键lei_routingKey_one车车"));
} else {
rabbitTemplate.convertAndSend("direct_exchange",
"lei_routingKey_two",
new Vehicle(i, i + "路由键lei_routingKey_two车车"));
}
}
}
}
消费者
/**
* @author lei
* @version 1.0
* @date 2020/7/14 22:49
* @desc 路由模式消费者 我这里rabbit_direct_queue_one 使用两个消费者接受消息
* rabbit_direct_queue_two 使用一个消费者接收消息
*/
@Component
public class DirectExchangeCousumer {
@RabbitListener(queues = "rabbit_direct_queue_one")
public void consumerOne(Vehicle vehicle) {
System.out.println("rabbit_direct_queue_one队列 消费者1:收到消息---" + vehicle);
}
@RabbitListener(queues = "rabbit_direct_queue_one")
public void consumerTwo(Vehicle vehicle) {
System.out.println("rabbit_direct_queue_one队列 消费者2:收到消息---" + vehicle);
}
@RabbitListener(queues = "rabbit_direct_queue_two")
public void consumerDirect(Vehicle vehicle) {
System.out.println("rabbit_direct_queue_two队列 :收到消息---" + vehicle);
}
}
测试:
此效果不明显,我们还可以单独测试两个路由键,使其逻辑清晰点
测试结果贴图
Topic模式
主题模式,其也是交换机模式的一种,与直连路由键交换机的区别在于其可以对交换机做层级匹配,直说可能有点抽象,咱们结合代码
配置
就设定了两个队列 一个 topicExchange交换机 并手动指定了两个路由键
topic.#
topic.*
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author PengLei
* @date 2020/7/15 0015 9:51
* @desc topic 主题模式 两个队列 一个topicExchange交换机
*/
@Configuration
public class TopicRabbitConfig {
/**
* 队列定义
* @return
*/
@Bean
public Queue topicQueueOne() {
return new Queue("rabbit_topic_queue_1");
}
/**
* 队列定义
* @return
*/
@Bean
public Queue topicQueueOTwo() {
return new Queue("rabbit_topic_queue_2");
}
/**
* 定义 TopicExchange 类型交换机
* @return
*/
@Bean
public TopicExchange exchangeTopic() {
return new TopicExchange("topic_exchange");
}
/**
* 队列一绑定到交换机 且设置路由键为 topic.#
* @return
*/
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");
}
/**
* 队列一绑定到交换机 且设置路由键为 topic.*
* @return
*/
@Bean
public Binding bindingTopic2() {
return BindingBuilder.bind(topicQueueOTwo()).to(exchangeTopic()).with("topic.*");
}
}
生产者生产消息
@Service
public class TopicRabbitProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendTopMessage() {
for (int i = 0; i < 10; i++) {
if (i%2==0) {
rabbitTemplate.convertSendAndReceive("topic_exchange","topic.lei",new Vehicle(i,i+"一个词路由键车车"));
}else {
rabbitTemplate.convertSendAndReceive("topic_exchange","topic.lei.xxl",new Vehicle(i,i+"多个词路由键车车"));
}
}
}
}
消费者
@Component
public class TopRabbitConsumer {
@RabbitListener(queues = "rabbit_topic_queue_1")
public void listenOne(Vehicle vehicle) {
System.out.println("监听到队列一消息" + vehicle);
}
@RabbitListener(queues = "rabbit_topic_queue_2")
public void listenOTwo(Vehicle vehicle) {
System.out.println("监听到队列二消息" + vehicle);
}
}
测试:
队列一绑定的是 topic.# ,其结果 发送到 路由键 topic.lei.xxl 与 topic.lei 的消息都被接受了 一个词多个词消息都被队列一监听到消费者消费了
队列二 绑定路由键是 topic.* 其结果只有%2==0的消息被接受了,即 topic.lei 仅仅监听到了一个词路由键的消息
* 仅仅会匹配路由键的一个词 # 则可以匹配路由键的多个词
以上为RabbitMQ 中几种模式的使用
但是,我们再开发中或实际生产中,或多或少会出现异常,不会仅仅像我们demo所演示的那般简单以及流畅,那么当出现问题时,例如消息发送成功与否未知,消息未被消息等等
下边,咱们进行简单地操作一下,保证一个生产者消息成功发送,以及消费者消息确认机制
消息发送确认机制,消息消费确认机制
改造咱们的配置 application.yml
spring:
rabbitmq:
#我这里仅写了ip 其余端口账号密码由于是演示 采用默认即可,不必要写
host: xxxx
# 开启消息确认机制 confirm 异步
publisher-confirm-type: correlated
listener:
direct:
# 消息开启手动确认
acknowledge-mode: manual
publisher-returns: true
main:
allow-bean-definition-overriding: true
咱们额外定义一个交换机 以及队列消息 试用下 fanout模式
/**
* @author lei
* @version 1.0
* @date 2020/7/15 20:15
* @desc 测试confirm 机制,专门创建了一个队列
*/
@Configuration
public class ConfirmRabbitConfig {
@Bean
public Queue confirmQueue() {
return new Queue("rabbit_confirm_queue");
}
@Bean
public FanoutExchange confirmExchange() {
return new FanoutExchange("confirm_fanout_exchange");
}
@Bean
public Binding confirmFanoutExchangeBing() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange());
}
}
生产者生产消息,配置 confirm消息发送确认机制
package com.leilei.confirm;
import com.leilei.common.Vehicle;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author lei
* @version 1.0
* @date 2020/7/15 20:16
* @desc
*/
@Service
public class ConfirmServer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 配置 confirm 机制
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 消息相关的数据,一般用于获取 唯一标识 id
* @param b 是否发送成功
* @param error 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String error) {
if (b) {
System.out.println("confirm 消息发送确认成功...消息ID为:" + correlationData.getId());
} else {
System.out.println("confirm 消息发送确认失败...消息ID为:" + correlationData.getId() + " 失败原因: " + error);
}
}
};
/**
* 发送消息 参数有:交换机 ,空路由键,消息,并设置一个唯一消息ID
*/
public void sendConfirm() {
rabbitTemplate.convertAndSend("confirm_fanout_exchange",
"",
new Vehicle(1,"confirm功能的车车"),
new CorrelationData("" + System.currentTimeMillis()));
//使用咱们上方配置的发送回调方法
rabbitTemplate.setConfirmCallback(confirmCallback);
}
}
消费者
设置消费者手动应答 先模拟一个正常接收场景
byte数组转对象工具
public static <T> Optional<T> bytesToObject(byte[] bytes) {
T t = null;
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn;
try {
sIn = new ObjectInputStream(in);
t = (T) sIn.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return Optional.ofNullable(t);
}
@Component
public class ConfirmConsumer {
@RabbitListener(queues = "rabbit_confirm_queue")
public void aa(Message message, Channel channel) throws IOException, InterruptedException {
try {
System.out.println("正常收到消息:" + bytesToObject(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 两个布尔值 第二个设为 false 则丢弃该消息 设为true 则返回给队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("消费失败 我此次将返回给队列");
}
}
}
测试:
此为消息成功发送,且消息被成功消费的场景
咱们再模拟一个消费失败的场景
代码修改
测试:
查看RabbitMQ web 管理界面
发现对应队列中 一条消息未被消息,无未应答消息
我们再删除掉异常代码,再次使用生产者发送消息,查看是否会消费两次(之前有一条消息未被消息,正常来说,该消息没有被丢弃则下次会继续投递)
那么,可能有时候,我们消息消费失败了,不需要保存到队列,下次从新投递新的消息,这该怎么处理呢?例如我们发送短信,可能短信没发过来,于是再此尝试 那么如果成功了,也该只啊一条短信过来
我们再消费者中设置消息应答 消息丢弃即可
测试:
查看web管理页面 发现也没有未被消息消息,则说明消息真的被丢弃了
总结
### springboot 整合rabbitmq
整合多种模式
common 为多个模式公共所需 我这里为 一个对象实体
easy 包下 为简单模式 一个生产者 一个消费者
work 包下 为工作模式 一个生产者 多个消费者 多个消费者会轮流获取到队列消息 例如 两个消费者 生产者发送十条消息 则 每个消费者会消费五次
confirm 包下 为rabbitmq 消费发送者确认模式 配合消费者端的手动应答,确保消息被成功发送以及消费
directechange 包下 路由模式 队列绑定到direct交换机再指定路由键 ,生产者发送消息时指定交换机路由键 ,则会被对应队列监听到
fanoutexchange包下 发布订阅模式 队列绑定到fanout交换机 未指定路由键名, 生产者发送消息时指定交换机 路由键指定为空或指定为 "" 则会被所有订阅到交换机的队列监听到
topic 包下 为主题模式 队列绑定到tipic模式交换机 在指定路由键 例如(top.#)或(top.*)
# * 区别
* 仅仅会匹配路由键的一个词 如果生产者发送消息到路由键 例如 leilei.one / leilei.two 则会被对应绑定的队列监听到
# 则可以匹配路由键的多个词 如果生产者发送消息到路由键 例如 leilei.one / leilei.one.xxl / leilei.one.xxl.eq 只要是以leilei 路由键为前缀的,无论多少个次都会被监听到
queue 队列默认为持久化状态
发送消息 convertSendAndReceive convertAndSend 区别
使用 convertAndSend 方法时的结果:输出时没有顺序,不需要等待,直接运行
使用 convertSendAndReceive 方法时的结果:输出有顺序 只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有等待间隔时间
确保消息成功发送 confirm 模式 如果失败则可以按照自己逻辑处理保存到数据库失败发送表 可后续做补偿
确保消息成功消费 ack 手动应答 失败时按业务 选择从新投递或者丢弃
rabbitmq初步的学习就到这里了,随着学习的不断深入,再继续更新
附上项目源码:
springboot2.3.1整合RabbitMQ