RabbitMQ
1. 消息中间件
1.1 概述
1. 消息中间件
- 利用可靠的消息传递机制进行系统和系统之间直接的通讯
- 通过提供消息传递和消息排队机制,可以在分布式系统环境下扩展进程间的通讯
2. 应用场景
- 跨系统数据传递
- 高并发的流量削峰
- 数据的分发和异步处理
- 大数据分析与传递
- 分布式事务
3. 常见的消息中间件
ActiveMQ、RabbitMQ、Kafka、RocketMQ等
4. 本质和设计
是一种接收数据,接受请求,存储数据,发送数据等功能的技术服务,MQ消息队列,负责数据的发送接收,存储和传递,所以性能要高于普通服务和技术
5. 核心组成部分
- 消息的协议
- 消息的持久化机制
- 消息的分发策略
- 消息的高可用、高可靠
- 消息的容错机制
1.2 消息队列协议
协议
- 计算机底层操作系统和应用程序通讯时都要共同遵守的一组约定,只有遵守共同的约定和规范,系统和底层的操作系统之间才能沟通
- 和一般的网络应用程序不同,它主要负责数据的接受和传递,性能比较高
- 协议对数据格式和计算机之间交换的数据必须严格规范
三要素
- 语法 用户数据与控制信息的结构和格式,以及数据出现的顺序
- 语义 解释控制信息每个部分的意义,对何时发出控制信息,完成什么动作,做出什么响应进行规范
- 时序 事件的发生顺序
# 语法:http规定了请求报文和响应报文的格式
# 语义:客户端主动发起请求 get/post请求
# 时序:一个请求对应一个响应
为什么消息中间件不直接使用http协议?
- 因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加解密等附加功能,对于消息而言,太复杂,没必要
- 大部分http都是短链接,在实际的交互过程中,一个请求的响应很可能会中断,中断后就不会持久化,造成请求丢失,不适合消息中间件,因为消息中间件可能是一个长期获取消息的过程,出现问题和故障要对数据或消息进行持久化。
AMQP协议
高级消息队列协议
支持者有 RabbitMQ、ACTIVEMQ
MQTP协议
即时通讯协议,物联网系统架构中的重要组成部分。
特点
- 轻量
- 结构简单
- 传输快,不支持事务
- 没有持久化设计
应用场景:计算能力有限,低带宽,网络不稳定的场景
支持者:RabbitMQ、ACTIVEMQ
OpenMessage协议
由阿里、雅虎、滴滴等创立的分布式消息中间件
Kafka协议
基于TCP/IP的二进制协议,消息内部通过长度进行分割,由一些基本数据类型组成
特点
- 结构简单
- 解析速度快
- 无事务支持
- 有持久化设计
1.3 持久化
将数据存入磁盘,而不是存在内存中随服务器的重启或者关机而消失,使数据能够永久保存。防止消息中间件该点出现故障重启后仍然能够接收、存储、分发消息。
常见方式
ActiveMQ | RabbitMQ | Kafaka | RocketMQ | |
---|---|---|---|---|
文件存储 | √ | √ | √ | √ |
数据库 | √ | × | × | × |
1.4 分发策略
角色:
- 生产者
- 存储消息
- 消费者
机制
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
发布订阅 | √ | √ | √ | √ |
轮询分发 | √ | √ | √ | × |
公平分发 | × | √ | √ | × |
重发 | √ | √ | × | √ |
消息拉取 | × | √ | √ | √ |
1.5 高可用和高可靠
高可用:在规定的条件内处于可执行规定功能状态的能力,无故障持续运行的能力。消息中间件必须支持集群部署。
高可靠:消息的传输可靠,通过协议来保证系统间数据解析的正确性;消息的存储可靠,通过持久化来保证消息的可靠性
- Master-slave主从共享数据的部署方式
生产者将消息发送到Master节点,所有连接这个节点的消息队列共享这块数据区域,Master节点负责写入,一旦Mater挂掉,slave节点继续服务。
- Master-slave主从同步部署方式
Master节点负责写入,但是会同步数据到slave节点形成副本,和zookeeper、redis主从机制很类同,如果消费者有多个,这样就可以取不同的节点进行消费,但消息的拷贝和同步会占用很大的带宽和网络资源。
- 多主集群同步部署方式
跟方式二类似,但是这个可以在多个节点进行写
- 多主集群转发部署模式
消费者访问slave进行消费,但是该节点没有对应的信息,此时会询问另一个节点有没有。
- Mater-slave与Breoker-cluster组合的方案
总结为三种模式
- 消息共享
- 消息同步(复制)
- 元数据共享
2. RabbitMQ入门和安装
RabbitMQ是一个开源的遵循AMQP协议的基于Erlang语言编写,支持多种客户端,用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性。
- 下载地址:
https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm
https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.2.6-1.el7.x86_64.rpm
- 将两个文件上传Linux
erlang-23.2.6-1.el7.x86_64.rpm # 注意此处 el7 7指的是centos的版本
rabbitmq-server-3.8.13-1.el7.noarch.rpm
- 安装
[root@summer1245 rabibitmq]# [root@summer1245 rabbitmq]# rpm -Uvh erlang-23.2.6-1.el7.x86_64.rpm
[root@summer1245 rabibitmq]# yum install -y erlang
[root@summer1245 rabibitmq]# erl -v
Erlang/OTP 24 [erts-12.3.2.1] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1]
Eshell V12.3.2.1 (abort with ^G)
ctrl + v 退出
[root@summer1245 rabibitmq]# yum install -y socat
[root@summer1245 rabibitmq]# rpm -Uvh rabbitmq-server-3.8.13-1.el7.noarch.rpm
[root@summer1245 rabibitmq]# yum install rabbitmq-server -y
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 开机启动服务
systemctl enable rabbitmq-server
3. RabbitMQWeb管理界面
默认情况下RabbitMQ是没有安装web端的客户端插件,需要安装
rabbitmq-plugins enable rabbitmq_management
# 安装完之后重启
systemctl restart rabbitmq-server
# 访问公网ip+15672 目前不能登录需要授权
# 需要注意的是 需要添加阿里云安全组 5672和15672
[root@summer1245 rabibitmq]# firewall-cmd --list-ports
6379/tcp
[root@summer1245 rabibitmq]# firewall-cmd --zone=public --add-port=15672/tcp --permanent
success
[root@summer1245 rabibitmq]# systemctl restart firewalld.service
# 新增用户
rabbitmqctl add_user admin admin
# 分配权限
rabbitmqctl set_user_tags admin administrator
# administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
# monitoring 监控者 登录控制台,查看所有信息
# policymaker 策略制定者 登录控制台,指定策略
# managment 普通管理员 登录控制台
# 添加资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 相关命令
rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator
rabbitmqctl change_password Username Newpassword # 修改密码
rabbitmqctl delete_user Username # 删除用户
rabbitmqctl list_users # 查看用户清单
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" # 为用户设置administrator角色
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
4. Docker安装RabbitMQ
docker pull rabbitmq:management
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
docker start 容器id
# 这里端口被占用,是因为之前下载的那个rabbitmq服务正在启动,关闭即可
5. RabbitMQ的角色分类
none
- 不能访问management plugin
management
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息
Policymaker
- 包含management所有权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息
Monitoring
- 包含management所有权限
- 列出所有的virtual hosts,包括不能登录的virtual hosts
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
- 查看所有的virtual hosts的全局统计信息
Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看,创建和删除users
- 查看创建permisssions
- 关闭所有用户的connections
6. 入门案例
此处需要开启5672和15672端口,阿里云和xshell都需要开
- 创建一个maven项目
- 导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
生产者
package com.hua.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("生产者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 通过创建交换机,声明队列,绑定关系,路由Key,发送消息和接收消息
String queueName = "queue1";
/**
* @param1 队列名
* @param2 是否持久化 存盘 如果为true,那么服务器重启队列依然存在,为false重启就不存在了
* @param3 是否排他 是否独占
* @param4 是否自动删除,如果为true最后一个消费者消费完消息后,自动删除
* @param5 携带附属参数
*
* */
channel.queueDeclare(queueName,false,false,true,null);
// 5. 准备消息内容
String message = "hello world";
// 6. 发送消息给队列
/**
* @param1 交换机 不存在没有交换机的队列,虽然没有指定,但是有个默认的
* @param2 队列名
* @param3 消息状态空值
* @param4 消息内容
* */
channel.basicPublish("",queueName,null,message.getBytes());
}catch (Exception e){
e.printStackTrace();
}finally {
// 7. 关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者
package com.hua.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("消费者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 通过创建交换机,声明队列,绑定关系,路由Key,发送消息和接收消息
channel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("接收的消息是"+new String(delivery.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接收消息失败");
}
});
System.in.read();
}catch (Exception e){
e.printStackTrace();
}finally {
// 7. 关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
设置自动删除的队列,当最后一个消费者消费完消息并断开连接后,队列会自动删除
持久化队列当服务器重启之后,队列仍然存在;非持久化队列当服务器重启之后,队列会消失。
7. AMQP协议
图片转载于:https://www.kuangstudy.com/zl/rabbitmq#1366670562620985345
8. 组件和架构
核心概念:
Server
:又称Broker,接受客户端的连接,实现AMQP实体服务。
Connection
:连接,应用程序与Broker的网络连接 TCP/IP 三次握手和四次挥手。
Channel
:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立Channel,每个Channel代表一个会话任务。
Message
:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host
:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个Exhange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。
Exchange
:交换机,接受消息,根据路由发送消息到绑定的队列。(
不具备消息存储的能力
)
Bindings
:Exchange和Queue之间的虚拟连接,binding中可以设置多个routing key。
Routing key
:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue
:队列,也称为Message Queue消息队列,保存消息并将它们转发给消费者。
运行流程
图片转载于:https://www.kuangstudy.com/zl/rabbitmq#1366670562620985345
9. 工作模式
9.1 简单模式
如果在图形化界面打不开队列,就换个浏览器
消息会先往交换机发送,然后交换机依据路由key分发消息到指定队列,如果不指定交换机,会发送到默认交换机的
9.2 Fanout模式
发布与订阅模式 只要是绑定了该路由的队列,那么该路由发送的消息,这些队列都能够收到
之后可以看到三个队列都会收到消息
生产者
package com.hua.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("生产者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 准备发送消息的内容
String message = "hello world";
// 5. 准备交换机
String exchange = "fanout-exchange";
// 6. 定义路由key
String routeKey = "";
// 7. 执行交换机的类型
String type = "fanout"; // 只是做标记,并没有用上
// 8. 发送消息给队列
channel.basicPublish(exchange,routeKey,null,message.getBytes());
}catch (Exception e){
e.printStackTrace();
}finally {
// 9. 关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10. 关闭连接
if (connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者
package com.hua.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("消费者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 通过创建交换机,声明队列,绑定关系,路由Key,发送消息和接收消息
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + ":接收的消息是" + new String(delivery.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println(queueName + ": 开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
System.out.println("接收消息出现异常");
} finally {
// 7. 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
new Thread(runnable,"queue4").start();
}
}
// queue2:接收的消息是hello world
// queue3:接收的消息是hello world
// queue1:接收的消息是hello world
9.3 Direct模式 (默认)
路由模式 增加了路由key,发送消息时会指定路由key,之后只有绑定该路由的队列有相应的路由key才能接收到消息
此处只有queue1和queue3才能够接收到email消息
生产者
package com.hua.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("生产者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 准备发送消息的内容
String message = "hello world";
// 5. 准备交换机
String exchange = "direct-exchange";
// 6. 定义路由key
String routeKey = "email";
// 7. 发送消息给队列
channel.basicPublish(exchange,routeKey,null,message.getBytes());
}catch (Exception e){
e.printStackTrace();
}finally {
// 8. 关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 9. 关闭连接
if (connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
//queue1:接收的消息是hello world
//queue3:接收的消息是hello world
9.4 Topic模式
*有且必须要有一级
#可以是0级或者是多级
一个.代表一级
如果路由key是这样,com.cource.order,那么queue1,2,3都可以收到消息
.cource.可以发往queue2,cource不可以
com可以发往queue1
生产者
package com.hua.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("生产者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 准备发送消息的内容
String message = "hello world";
// 5. 准备交换机
String exchange = "topic-change";
// 6. 定义路由key
String routeKey = "com.cource.order";
// 7. 指定交换机的类型
String type = "topic"; // 此处并没有用到,因为已经使用可视化界面指定类型了
// 8. 发送消息给队列
channel.basicPublish(exchange,routeKey,null,message.getBytes());
}catch (Exception e){
e.printStackTrace();
}finally {
// 9. 关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10. 关闭连接
if (connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
// queue1:接收的消息是hello world
// queue2:接收的消息是hello world
// queue3:接收的消息是hello world
9.5 Headers模式
根据参数条件来发送消息
9.6 完整方式
以上代码实现方式都没有将队列与交换机进行绑定,因为已经用可视化界面绑定了。
接下来用代码方式声明交换机,声明队列,绑定交换机和队列,往队列发送消息
package com.hua.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 1. 创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2. 创建一个连接 connection
connection = connectionFactory.newConnection("生产者");
// 3. 通过连接获取通道Channel
channel = connection.createChannel();
// 4. 准备发送消息的内容
String message = "hello world";
// 5. 准备交换机
String exchange = "direct-order";
// 6. 定义路由key
String routeKey = "order";
// 7. 定义交换机的类型
String type = "direct";
// 8. 声明一个交换机 交换机名字,类型,是否持久化
channel.exchangeDeclare(exchange,type,true);
// 9. 声明队列 队列名.是否持久化.排他性.自动删除.参数
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);
// 10.绑定关系 队列名 交换机名 routekey
channel.queueBind("queue5",exchange,"order");
channel.queueBind("queue6",exchange,"order");
channel.queueBind("queue7",exchange,"course");
// 11. 发送消息给队列
channel.basicPublish(exchange,routeKey,null,message.getBytes());
}catch (Exception e){
e.printStackTrace();
}finally {
// 9. 关闭通道
if(channel!=null&&channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 10. 关闭连接
if (connection!=null&&connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
// queue5:接收的消息是hello world
// queue6:接收的消息是hello world
9.7 Work轮询模式
轮询模式需要注意的是应答必须开启
自动应答
一个消费者一条,按均分配
生产者
package com.hua.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "test:" + i;
// 7: 发送消息给中间件rabbitmq-server
// 此处绑定默认的交换机,其中路由key就是队列的名字
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者1
package com.hua.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 定义接受消息的回调
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000); // 模拟服务器性能差
} catch (Exception ex) {
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 6: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2
package com.hua.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 定义接受消息的回调
Channel finalChannel = channel;
// finalChannel.basicQos(1); 此处的应答必须是自动应答
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200); // 模拟服务器性能好
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 6: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
先启动两个消费者,之后启动生产者,出现以下结果
Work1-开始接受消息
Work1-收到消息是:test:1
Work1-收到消息是:test:3
Work1-收到消息是:test:5
Work1-收到消息是:test:7
Work1-收到消息是:test:9
Work1-收到消息是:test:11
Work1-收到消息是:test:13
Work1-收到消息是:test:15
Work1-收到消息是:test:17
Work1-收到消息是:test:19
Work2-开始接受消息
Work2-收到消息是:test:2
Work2-收到消息是:test:4
Work2-收到消息是:test:6
Work2-收到消息是:test:8
Work2-收到消息是:test:10
Work2-收到消息是:test:12
Work2-收到消息是:test:14
Work2-收到消息是:test:16
Work2-收到消息是:test:18
Work2-收到消息是:test:20
// 可以看到不管消费者的性能差异如何,始终是轮流接收到消息的
// 需要注意的是应答必须开启自动应答
9.8 Work公平分发模式
根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
这里应答方式需要改为
手动应答
finalChannel.basicQos(1); // 一次性从队列中取出多少个消息,很显然,相同时间内性能高的消费者消费的消息多
// 这个值不能设置太大,如果太大,消息不多的情况下,性能高的消费者可能一次性把消息消费完了。
finalChannel.basicConsume("queue1", false, new DeliverCallback()
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
package com.hua.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
Work1-开始接受消息
Work1-收到消息是:test:1
Work1-收到消息是:test:11
Work2-开始接受消息
Work2-收到消息是:test:2
Work2-收到消息是:test:3
Work2-收到消息是:test:4
Work2-收到消息是:test:5
Work2-收到消息是:test:6
Work2-收到消息是:test:7
Work2-收到消息是:test:8
Work2-收到消息是:test:9
Work2-收到消息是:test:10
Work2-收到消息是:test:12
Work2-收到消息是:test:13
Work2-收到消息是:test:14
Work2-收到消息是:test:15
Work2-收到消息是:test:16
Work2-收到消息是:test:17
Work2-收到消息是:test:18
Work2-收到消息是:test:19
Work2-收到消息是:test:20
// 此处模拟的是work2的性能更好,出现以上结果,表明公平分发机制,其实就是能者多劳
10. RabbitMQ的使用场景
- 解耦、削峰、异步
①串行方式
将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
②并行方式
将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
存在问题:
1:耦合度高
2:需要自己写线程池自己维护成本太高
3:消息可能会丢失,需要自己做消息补偿
4:如何保证消息的可靠性,需要自己写
5:如果服务器承载不了,需要自己去写高可用
③异步消息队列的方式
好处
1:完全解耦,用MQ建立桥接
2:有独立的线程池和运行模型
3:出现了消息可能会丢失,MQ有持久化功能
4:如何保证消息的可靠性,死信队列和消息转移的等
5:如果服务器承载不了,需要自己去写高可用,HA镜像模型高可用。
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
- 高内聚,低耦合
- 流量的削峰
- 分布式事务的可靠消费和可靠生产
- 索引、缓存、静态化处理的数据同步
- 流量监控
- 日志监控(ELK)
- 下单、订单分发、抢票
11. 整合SpringBoot
11.1 Fanout模式
创建SpringBoot项目,引入web、rabbitmq依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置文件
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 47.94.96.44
port: 5672
生产者工程
service
注意此处发送的消息类型为Object
package com.hua.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "fanout_order_exchange";
// 2: 路由key
private String routeKey = "";
public void makeOrder(Long userId, Long productId, int num) {
// 1:模拟用户下单
String orderNumber = UUID.randomUUID().toString();
// 2: 根据商品id productId 去查询商品的库存
// int numstore = productSerivce.getProductNum(productId);
// 3: 判断库存是否充足
// if(num > numstore ){ return "商品库存不足..."; }
// 4: 下单逻辑
// orderService.saveOrder(order);
// 5: 下单成功要扣减库存
// 6: 下单完成以后
System.out.println("用户 " + userId + ",订单编号是:" + orderNumber);
// 发送订单信息给RabbitMQ fanout
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumber);
}
}
configuration
package com.hua.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
// 1. 声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_order_exchange",true,false);
}
// 2. 声明队列
@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
return new Queue("weixin.fanout.queue", true);
}
// 3. 绑定关系
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding weixinBinding(){
return BindingBuilder.bind(weixinQueue()).to(fanoutExchange());
}
}
发送消息
@SpringBootTest
class RabbitMqSpringBootApplicationTests {
@Autowired
OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder(1l,1l,12);
}
}
消费者工程
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: ip
port: 5672
server:
port: 8081
service
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("接受到了邮箱消息====>"+message);
}
}
@RabbitListener(queues = {"sms.fanout.queue"})
@Service
public class SMSConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("接受到了SMS消息====>"+message);
}
}
@RabbitListener(queues = {"weixin.fanout.queue"})
@Service
public class WeixinConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("接受到了微信消息====>"+message);
}
}
先启动消费者,后启动生产者,结果如下
用户 1,订单编号是:f5aaec54-d746-4281-8732-6d5744c83dd7
接受到了SMS消息====>f5aaec54-d746-4281-8732-6d5744c83dd7
接受到了微信消息====>f5aaec54-d746-4281-8732-6d5744c83dd7
接受到了邮箱消息====>f5aaec54-d746-4281-8732-6d5744c83dd7
11.2 Direct模式
如果先启动消费者,因为交换机、队列、绑定关系都是在生产者声明的,所以会报错,此时也可以在消费者复制一份DirectRabbitMQConfig
spring:
rabbitmq:
host: ip
port: 5672
username: admin
password: admin
virtual-host: /
server:
port: 8081
生产者
@Configuration
public class DirectRabbitMQConfig {
// 声明交换机
@Bean
DirectExchange directExchange(){
return new DirectExchange("direct_order_exchange",true,false);
}
// 声明队列
@Bean
Queue emailQueue(){
return new Queue("email.direct.queue",true);
}
@Bean
Queue smsQueue(){
return new Queue("sms.direct.queue",true);
}
@Bean
Queue weixinQueue(){
return new Queue("weixin.direct.queue",true);
}
@Bean
Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
@Bean
Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
Binding weixinBinding(){
return BindingBuilder.bind(weixinQueue()).to(directExchange()).with("weixin");
}
}
@Service
public class DirectOrderService {
@Autowired
RabbitTemplate rabbitTemplate;
public String exchange = "direct_order_exchange";
public String routerKey = "email";
public void makeOrder(){
String orderNumber = UUID.randomUUID().toString();
System.out.println("订单编号"+orderNumber);
rabbitTemplate.convertAndSend(exchange,"email",orderNumber);
rabbitTemplate.convertAndSend(exchange,"weixin",orderNumber);
}
}
@SpringBootTest
class RabbitMqDirectProducerApplicationTests {
@Autowired
DirectOrderService directOrderService;
@Test
void contextLoads() {
directOrderService.makeOrder(); // 发送消息
}
}
消费者
@Service
@RabbitListener(queues = {"email.direct.queue"})
public class DirectEmailConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("email接收到消息:"+message);
}
}
@Service
@RabbitListener(queues = {"sms.direct.queue"})
public class DirectSmsConsumer {
@RabbitHandler
public void receive(String message){
System.out.println("sms接收到消息"+ message);
}
}
@Service
@RabbitListener(queues = {"weixin.direct.queue"})
public class DirectWeixinConsumer {
@RabbitHandler
public void receive(String message){
System.out.println("微信收到消息:"+message);
}
}
11.3 Topic模式
使用注解的方式
消费者
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.sms.#"
))
public class TopicSmsConsumer {
@RabbitHandler
public void receive(String message){
System.out.println("sms接收到消息"+ message);
}
}
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "weixin.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.weixin.#"
))
public class TopicWeixinConsumer {
@RabbitHandler
public void receive(String message){
System.out.println("微信收到消息:"+message);
}
}
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.email.#"
))
public class TopicEmailConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("email接收到消息:"+message);
}
}
生产者
@Service
public class TopicOrderService {
@Autowired
RabbitTemplate rabbitTemplate;
private String exchange = "topic_order_exchange";
private String routeKey = "com.email.com";
public void makeOrder(){
String orderNo = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend(exchange,routeKey,orderNo);
}
}
发送消息
@SpringBootTest
class RabbitMqDirectProducerApplicationTests {
@Autowired
TopicOrderService topicOrderService;
@Test
void contextLoads() {
topicOrderService.makeOrder();
}
}
12. 消息过期时间
12.1 队列的属性进行设置
如果两种方法同时使用,则消息的过期时间以最小的TTL为准
该种方式设置的消息在队列的生存时间一旦超过设置的TTL值,就被放到死信队列,消费者将无法收到该消息,而单个设置消息的过期时间,超时之后就会不复存在。
队列的所有消息有统一的过期时间
@Configuration
public class TTLRabbitMQConfig {
@Bean
DirectExchange directExchange(){
return new DirectExchange("TTL_direct_exchange",true,false);
}
@Bean
Queue ttlQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
return new Queue("TTL_direct_queue",true,false,false,args);
}
@Bean
Binding binding(){
return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl");
}
}
public void ttlMakeOrder(){
String orderNumber = UUID.randomUUID().toString();
System.out.println("订单编号"+orderNumber);
rabbitTemplate.convertAndSend("TTL_direct_exchange","ttl",orderNumber);
}
@Test
void contextLoads() {
directOrderService.ttlMakeOrder();
}
12.2 消息过期时间
单独为某一个消息设置过期时间
@Configuration
public class TTLRabbitMQConfig {
@Bean
DirectExchange directExchange(){
return new DirectExchange("TTL_direct_exchange",true,false);
}
@Bean
Queue ttlMessageQueue(){
return new Queue("TTL_Message_queue",true,false,false,null);
}
@Bean
Binding bindingMessage(){
return BindingBuilder.bind(ttlMessageQueue()).to(directExchange()).with("ttl_message");
}
}
public void ttlMessage(){
String orderNumber = UUID.randomUUID().toString();
System.out.println("订单编号"+orderNumber);
MessagePostProcessor processor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("utf-8");
return message;
}
};
rabbitTemplate.convertAndSend("TTL_direct_exchange","ttl_message",orderNumber,processor);
}
@Test
void contextLoads() {
directOrderService.ttlMessage();
}
13. 死信队列
DLX,全称为Dead-Letter-Exchange,死信交换机,当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。
@Configuration
public class DeadRabbitMQConfig {
// 死信交换机
@Bean
DirectExchange deadExchange(){
return new DirectExchange("dead_direct_exchange",true,false);
}
// 死信队列
@Bean
Queue deadQueue(){
return new Queue("deadQueue",true);
}
// 绑定
@Bean
Binding deadBind(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
}
@Configuration
public class TTLRabbitMQConfig {
@Bean
DirectExchange directExchange1(){
return new DirectExchange("TTL_direct_exchange",true,false);
}
@Bean
Queue ttlQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000);
args.put("x-dead-letter-exchange","dead_direct_exchange");
args.put("x-dead-letter-routing-key","dead"); // fanout模式不需要配置
return new Queue("TTL_direct_queue",true,false,false,args);
}
@Bean
Binding binding(){
return BindingBuilder.bind(ttlQueue()).to(directExchange1()).with("ttl");
}
}
public void ttlMakeOrder(){
String orderNumber = UUID.randomUUID().toString();
System.out.println("订单编号"+orderNumber);
rabbitTemplate.convertAndSend("TTL_direct_exchange","ttl",orderNumber);
}
@Test
void contextLoads() {
directOrderService.ttlMakeOrder();
}
这里可能报错,原因是以前创建过这个队列,又设置了新的参数,不会更新的
// 设置队列的最大长度
args.put("x-max-length",10);
14. 内存磁盘监控
内存控制
- 命令方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当RabbitMQ的内存超过40%时,就会产生警告并且阻塞所有生产者的连接。
通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效。
- 配置文件方式 /etc/rabbitmq/rabbitmq.conf
# 默认
# vm_memory_high_watermark.relative = 0.4
# 使用relative相对值进行设置fraction,建议取值在0.4~0.7之间,不建议超过0.7.
vm_memory_high_watermark.relative = 0.6
# 使用absolute的绝对值的方式,但是是KB,MB,GB对应的命令如下
vm_memory_high_watermark.absolute = 2GB
内存换页
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阈值是50%时就会换页处理。也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作。比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中。从而达到稳健的运行。
可以修改vm_memory_high_watermark_paging_ratio的值
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)
磁盘预警
当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。
默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间剩50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。
这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB ,第二检查可能就是1MB,就会出现警告。
# 命令方式
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit:固定单位 KB MB GB
fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)
# 配置文件方式
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb
15. 集群
前提是将RabbitMQ服务停止
# 启动第一个节点rabbit-1
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
# 启动第二个节点rabbit-2
# 注意这里需要修改端口号,不能与上边的一样
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
# 将rabbit-1设置为主节点
# 停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
# 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
# 启动应用
sudo rabbitmqctl -n rabbit-1 start_app
# 将rabbit-2设置为从节点
# 停止应用
sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-2 reset
# 将rabbit-2节点加入到rabbit-1(主节点)集群当中
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@summer1245 # 此处summer1245是主机名
# 启动应用
sudo rabbitmqctl -n rabbit-2 start_app
# 查看集群状态
sudo rabbitmqctl cluster_status -n rabbit-1
注意在访问的时候:web结面的管理需要给15672 node-1 和15673的node-2 设置用户名和密码。如下
rabbitmq-plugins enable rabbitmq_management # 访问公网ip+15672/15673 记得添加阿里云安全组
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
主节点挂掉,从节点是无法启动的
如果采用多机部署方式,需读取其中一个节点的cookie,并复制到其他节点(节点之间通过cookie确定相互是否可通信)。cookie存放在/var/lib/rabbitmq/.erlang.cookie。
例如:主机名分别为rabbit-1、rabbit-2
1、逐个启动各节点
2、配置各节点的hosts文件( vim /etc/hosts)
ip1:rabbit-1
ip2:rabbit-2
其它步骤雷同单机部署方式
16. 分布式事务
分布式事务指事务的操作位于不同的节点上,需要保证事务的ACID特性
实现方式
- 两阶段提交(2pc)
两阶段提交(Two-phase Commit,2PC),需要数据库厂商的支持,java组件有atomikos等,通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
①准备阶段
协调者询问参与者事务是否执行成功,参与者发回事务执行结果。
②提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。
- 补偿事务(TCC)
严选,阿里,蚂蚁金服。
TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:
- Try 阶段主要是对业务系统做检测及资源预留
- Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 – – – Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
- Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
- 本地消息表(异步确保)
支付宝、微信支付主动查询支付状态,对账单的形式
本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。
- 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
- 之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
- 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。
- MQ 事务消息
有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。
以RabbitMQ 中间件为例,其思路大致为:
-
第一阶段Prepared消息,会拿到消息地址。 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
-
在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。