RabbitMQ 笔记

  • Post author:
  • Post category:其他


文章目录



课程介绍



课程目的

  1. 了解消息中间件背景知识、使用场景、发展等
  2. 掌握 RabbitMQ、RocketMQ、Kafka 这三款主流的消息中间件的架构、模型和使用(开发、 安装、集群部署、运维、监控等)
  3. 掌握消息的可靠性、幂等性、顺序消息、延迟消息、事务消息等进阶的知识,以及大规模生产 环境中的使用经验,轻松应对各种复杂的业务场景
  4. 掌握顶级开源消息中间件核心源码,理解其背后的架构设计思想以及在高性能存储系统、网络 编程等方面的技巧(会涉及网络通信、操作系统等底层知识)
  5. 理解主流消息中间件的优缺点,具备技术选型能力
  6. 让你无论是在日后的工作还是面试求职中遇到消息中间件相关问题都能轻松应对



课程主要内容


消息中间件概述:

分布式系统中如何进行远程通信

为什么要使用消息中间件?市场上有哪些产品?有什么优缺点?该用哪个

JMS 规范和 AMQP 协议


RabbitMQ 部分:

RabbitMQ 架构、环境准备和整合

高级特性如消息的可靠性保障、死信队列、延迟队列等

RabbitMQ 的集群、运维

源码分析,解析 RabbitMQ 的启动过程、交换器的实现、队列的实现等



第一部分:消息中间件概述



第 1 节 分布式架构通信



1.1 分布式架构通信原理


SOA 架构:

image.png
2

根据实际业务,把系统拆分成合适的、独立部署的模块,模块之间相互独立。 优点:分布式、松耦合、扩展灵活、可重用。

SOA 架构系统中,使用 Dubbo 和 Zookeeper 进行服务间的远程通信。

优点: Dubbo 使用自定义的 TCP 协议,可以让请求报文体积更小,或者使用 HTTP2 协议,也可以减少报文 的体积,提高传输效率。


微服务架构:

image.png

SpringCloud 中使用 Feign 解决服务之间远程通信的问题。

Feign:轻量级 RESTful 的 HTTP 服务客户端,广泛应用于 Spring Cloud 中。

符合面向接口化的编程 习惯。

本质:封装了 HTTP 调用流程,类似 Dubbo 的服务调用。

多用于同步远程调用。

RPC 主要基于 TCP/UDP 协议,HTTP 协议是应用层协议,是构建在传输层协议 TCP 之上的,RPC 效率 更高

RPC 长连接:不必每次通信都像 HTTP 一样三次握手,减少网络开销;

HTTP 服务开发迭代更快:在接口不多,系统与系统之间交互比较少的情况下,HTTP 就显得更加方 便;相反,在接口比较多,系统与系统之间交互比较多的情况下,HTTP 就没有 RPC 有优势。



1.2 分布式同步通信的问题

电商项目中,如果后台添加商品信息,该信息放到数据库。 我们同时,需要更新搜索引擎的倒排索引 同时,假如有商品页面的静态化处理,也需要更新该页面信息

image.png

方式一、可以在后台添加商品的方法中,如果数据插入数据库成功,就调用更新倒排索引的方法, 接着调用更新静态化页面的方法。

代码应该是:

Long goodsId = addGoods(goods); if (goodsId != null) {

refreshInvertedIndex(goods);

refreshStaticPage(goods); }

问题: 假如更新倒排索引失败,该怎么办? 假如更新静态页面失败怎么办?

解决方式:

如果更新倒排索引失败,重试

如果更新静态页面失败,重试

代码应该是这样:

public Long saveGoods() {

Long goodsId = addGoods(goods); 
if (goodsId != null) { 
// 调用递归的方法,实现重试 
boolean indexFlag = refreshInvertedIndex(goods);
 // 调用递归的方法,实现重试 
boolean pageFlag = refreshStaticPage(goods); }

}

private boolean refreshInvertedIndex(Goods goods) {

// 调用服务的方法 
boolean flag = indexService.refreshIndex(goods);
 if (!flag) { refreshInvertedIndex(goods); }

}

private boolean refreshStaticPage(Goods goods) {

// 调用服务的方法
 boolean flag = staticPageService.refreshStaticPage(goods); 
if (!flag) { refreshStaticPage(goods); }

}




以上代码在执行中的问题:

  1. 如果相应的更新一直失败,岂不是一直死循环直到调用栈崩溃?
  2. 如果相应的更新一直在重试,在重试期间,添加商品的方法调用是不是一直阻塞中?
  3. 如果添加商品的时候并发量很大,效率会很低?

或许可以加上迭代的等待时间,迭代的次数加以限制,减少 CPU 消耗。

或许还可以加上多线程,同时执行更新的操作,减少执行的时间。

但是都是基于该调用一定在可见的时间内调用成功。

还是老问题:如果更新失败怎么办?

归根到底,是同步调用处理不当。这个问题在分布式架构中尤为严重。

方式二:可以先执行添加商品的方法,商品添加成功,将更新索引和更新静态页面的任务缓存到一 个公共的位置,然后由相应的服务从该位置获取任务来执行。

Long goodsId = addGoods(goods); 
if (goodsId != null)
 { goodsTaskService.cache(goods); }

此时,由于添加商品仅仅是将数据插入数据库,然后将任务信息缓存,调用立刻返回。

对于添加商品方法的调用,不会存在线程阻塞,不会存在调用栈崩溃。

再考虑远一点。

由于更新倒排索引的的服务和更新静态页面的服务要从公共的缓存或者叫任务池中取出任务并执 行,它们也会有执行失败的问题,也需要重试。如果一直更新失败,也需要一个方式来处理。 比如如果更新失败,则每隔 3 秒钟重试一次,重试三次都失败则放弃执行。 然后将错误结果放到另一个公共的地方,等待后续的补偿,无论是手工还是自动的。

还有问题:

  1. 这个公共的任务池,会不会宕机?会不会服务不可用?如何解决?
  2. 你一定确信消息发送到任务池了吗?
  3. 如果在向任务池发送任务失败该如何处理?
  4. 如果重试的时候发送成功了,但是实际上发送了多次,更新倒排索引服务和更新静态页面服务 会不会重复执行?
  5. 如果重复执行,最终结果会不会不一样?

看来真是解决了一个问题,引进来三个问题。 如果上述的问题都由我们从 0 开始解决,开发难度可想而知。

分布式服务中,由于业务拆分,应用也需要拆分,甚至数据库分库分表。 但是完成一个业务处理,往往要设计到多个模块之间的协调处理。此时模块之间,服务与服务之间 以及客户端与服务端之间的通信将变得非常复杂。



1.3 分布式异步通信模式

image.png

比较典型的“生产者消费者模式”,可以跨平台、支持异构系统,通常借助消息中间件来完成。

优点:系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹 性。

服务解耦、流量削峰填谷等

缺点:消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。

使用异步消息模式需要注意的问题:

  1. 哪些业务需要同步处理,哪些业务可以异步处理?
  2. 如何保证消息的安全?消息是否会丢失,是否会重复?
  3. 请求的延迟如何能够减少?
  4. 消息接收的顺序是否会影响到业务流程的正常执行?
  5. 消息处理失败后是否需要重发?如果重发如何保证幂等性?



第 2 节 消息中间件简介



2.1 消息中间件概念

维基百科对消息中间件的解释:面向消息的系统(消息中间件)是在分布式系统中完成消息的发送 和接收的基础软件。

消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基 于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进 程的通信。

消息中间件就是在通信的上下游之间截断:break it,Broker

然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。

体会一下:“必有歹人从中作梗”,”定有贵人从中相助“

异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中

间件。

image.png



2.2 自定义消息中间件

并发编程领域经典面试题:请使用 java 代码来实现“生产者消费者模式”。

BlockingQueue(阻塞队列)是 java 中常见的容器,在多线程编程中被广泛使用。

当队列容器已满时生产者线程被阻塞,直到队列未满后才可以继续 put;

当队列容器为空时,消费者线程被阻塞,直至队列非空时才可以继续 take。

image.png

provider

package com.galaxy;

import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author lane
 * @date 2021年08月15日 下午3:00
 */
public class Provider implements Runnable {

    private BlockingQueue<Mask> queue;

    public Provider(BlockingQueue<Mask> queue){
        this.queue = queue;
    }

    private int index;

    @Override
    public void run() {
        while (true){

            try {
                Thread.sleep(500);
                if (queue.remainingCapacity()<=0){

                    System.out.println("口罩生产仓库已满!");
                }
                else {
                    Mask mask = new Mask();
                    mask.setId(index++);
                    mask.setType("N95");
                    System.out.println("正在生产口罩id: " + (index - 1));
                    queue.put(mask);
                    System.out.println("仓库口罩个数" + queue.size());
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

consumer

package com.galaxy;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author lane
 * @date 2021年08月15日 下午3:00
 */
public class Consumer implements Runnable {

    private BlockingQueue<Mask> queue;

    public Consumer(BlockingQueue<Mask> queue){
        this.queue = queue;
    }


    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(200);
                Mask mask = queue.take();
                System.out.println("正在出售口罩的ID"+mask.getId()+"口罩的类型"+mask.getType());


            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

mask

package com.galaxy;

/**
 * @author lane
 * @date 2021年08月15日 下午3:05
 */
public class Mask {

    private Integer id;
    private String type;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "Mask{" +
                "id=" + id +
                ", type='" + type + '\'' +
                '}';
    }
}

sale

package com.galaxy;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

/**
 * @author lane
 * @date 2021年08月15日 下午4:00
 */
public class Sale {
    public static void main(String[] args) {
        BlockingQueue<Mask> queue = new ArrayBlockingQueue<>(20);

        new Thread(new Provider(queue)).start();

        new Thread(new Consumer(queue)).start();

    }


}

result

正在生产口罩id: 0
仓库口罩个数1
正在出售口罩的ID0口罩的类型N95
正在生产口罩id: 1
仓库口罩个数1
正在出售口罩的ID1口罩的类型N95
正在生产口罩id: 2
仓库口罩个数1
正在出售口罩的ID2口罩的类型N95

上述代码放到生产环境显然是不行的

比如:没有集群,没有分布式,玩儿法太单一,不能满足企业 级应用的要求。。。

比如:消息有没有持久化? 怎么确定消息一定能发送成功? 怎么确定消息一定能被消费成功? 高并发下的性能怎么样? 系统可靠吗? 有没有 Pub/Sub 模式? 有没有考虑过限流?



2.3 主流消息中间件及选型

在传统金融机构、银行、政府机构等有一些老系统还在使用 IBM 等厂商提供的商用 MQ 产品。

当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、 ZeroMQ 等,其中应用最为广泛的要数 RabbitMQ、RocketMQ、Kafka 这三款。

Redis 在某种程度上也可以是实现类似“Queue”和“Pub/Sub”的机制,严格意义上不算消息中间件。

image.png

image.png

image.png


选取原则

首先,产品应该是开源的。开源意味着如果队列使用中遇到 bug,可以很快修改,而不用等待开发 者的更新。

其次,产品必须是近几年比较流行的,要有一个活跃的社区。这样遇到问题很快就可以找到解决方 法。同时流行也意味着 bug 较少。流行的产品一般跟周边系统兼容性比较好。

最后,作为消息队列,要具备以下几个特性:

1、消息传输的可靠性:保证消息不会丢失。

2、支持集群,包括横向扩展,单点故障都可以解决。

3、性能要好,要能够满足业务的性能需求。


RabbitMQ

RabbitMQ 开始是用在电信业务的可靠通信的,也是少有的几款支持 AMQP 协议的产品之一。

优点:

  1. 轻量级,快速,部署使用方便
  2. 支持灵活的路由配置。RabbitMQ 中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
  3. RabbitMQ 的客户端支持大多数的编程语言。

缺点

  1. 如果有大量消息堆积在队列中,性能会急剧下降
  2. RabbitMQ 的性能在 Kafka 和 RocketMQ 中是最差的,每秒处理几万到几十万的消息。如果应 用要求高的性能,不要选择 RabbitMQ。
  3. RabbitMQ 是 Erlang 开发的,功能扩展和二次开发代价很高。


RocketMQ

RocketMQ 是一个开源的消息队列,使用 java 实现。借鉴了 Kafka 的设计并做了很多改进。

RocketMQ 主要用于有序,事务,流计算,消息推送,日志流处理,binlog 分发等场景。

经过了历次的 双 11 考验,性能,稳定性可可靠性没的说。

RocketMQ 几乎具备了消息队列应该具备的所有特性和功能。 java 开发,阅读源代码、扩展、二次开发很方便。

对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时 间,可以使用 RocketMQ。

性能比 RabbitMQ 高一个数量级,每秒处理几十万的消息。

缺点: 跟周边系统的整合和兼容不是很好。


Kafka

Kafka 的可靠性,稳定性和功能特性基本满足大多数的应用场景。 跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持 Kafka。

Kafka 高效,可伸缩,消息持久化。支持分区、副本和容错。

Kafka 是 Scala 和 Java 开发的,对批处理和异步处理做了大量的设计,因此 Kafka 可以得到非常高的 性能。它的异步消息的发送和接收是三个中最好的,但是跟 RocketMQ 拉不开数量级,每秒处理几十万 的消息。

如果是异步消息,并且开启了压缩,Kafka 最终可以达到每秒处理 2000w 消息的级别。

但是由于是异步的和批处理的,延迟也会高,不适合电商场景。

特点 RabbitMQ RocketMQ Kafka
单机吞吐量 1w 量级 10w 量级 10w 量级
开发语言 Erlang Java Java 和 Scala
消息延迟 微秒 毫秒 毫秒
消息丢失 可能性很低 参数优化后可以 0 丢失 参数优化后可以 0 丢失
消费模式 推拉 推拉 拉取
主题数量对吞吐量 的影响 \ 几百上千个主题会对吞吐量有 一个小的影响 几十上百个主题会极大 影响吞吐量
可用性 高(主从) 很高(主从) 很高(分布式)

如果对于吞吐量要求比较高 10 万 +

RocketMQ、Kafka

如果对于消息延时要求比较高

RabbitMQ、RocketMQ

如果对于开发语言要求自己扩展

RocketMQ、Kafka

如果消息丢失要求高

RocketMQ、Kafka

如果对于整合兼容要求比较高

RabbitMQ、Kafka

如果偏向于电商

RocketMQ

如果偏向于 Spring Cloud 系列整合

RabbitMQ

如果偏向与大数据

Kafka



2.4 消息中间件应用场景

消息中间件的使用场景非常广泛,比如,12306 购票的排队锁座,电商秒杀,大数据实时计算等。

电商秒杀案例:

比如 6.18,活动从 0:00 开始,仅限前 200 名,秒杀即将开始时,用户会疯狂刷新 APP 或者浏览器来 保证自己能够尽早的看到商品。

当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?

在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发 的写请求呢?


系统应该如何应对高并发的读请求

使用缓存策略将请求挡在上层中的缓存中

能静态化的数据尽量做到静态化

加入限流(比如对短时间之内来自某一个用户,某一个 IP、某个设备的重复请求做丢弃处理)


系统应该如何应对高并发的写请求

生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s 内,有 1 万个数据连接同 时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用 消息队列。


消息队列的作用:

削去秒杀场景下的峰值写流量——流量削峰

通过异步处理简化秒杀请求中的业务流程——异步处理

解耦,实现秒杀系统模块之间松耦合——解耦


削去秒杀场景下的峰值写流量

将秒杀请求暂存于消息队列

业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去 处理其它用户的请求。

削峰填谷

削平短暂的流量高峰,消息堆积会造成请求延迟处理,但秒杀用户对于短暂延迟有一定 容忍度。

秒杀商品有 1000 件,处理一次购买请求的时间是 500ms,那么总共就需要 500s 的时间。这时你 部署 10 个队列处理程序,那么秒杀请求的处理时间就是 50s,也就是说用户需要等待 50s 才可以看到 秒杀的结果,这是可以接受的。这时会并发 10 个请求到达数据库,并不会对数据库造成很大的压力。

通过异步处理简化秒杀请求中的业务流程

先处理主要的业务,异步处理次要的业务。 如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积 分。 此时秒杀只要处理生成订单,扣减库存的耗时,发放优惠券、增加用户积分异步去处理了。

解耦,实现秒杀系统模块之间松耦合

将秒杀数据同步给数据团队,有两种思路:

  1. 使用 HTTP 或者 RPC 同步调用,即提供一个接口,实时将数据推送给数据服务。 系统的耦合度高,如果其中一个服务有问题,可能会导致另一个服务不可用。
  2. 使用消息队列 将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理。

拉勾 B 端 C 端数据同步案例:

拉勾网站分 B 端和 C 端,B 端面向企业用户,C 端面向求职者。 这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。

但是各自又需要对方的数据,需要共享:如

  1. 当 C 端求职者在更新简历之后,B 端企业用户如何尽早看到该简历更新?
  2. 当 B 端企业用户发布新的职位需求后,C 端用户如何尽早看到该职位信息?

无论是 B 端还是 C 端,都有各自的搜索引擎和缓存,B 端需要获取 C 端的更新以更新搜索引擎和缓 存;C 端需要获取 B 端的更新以更新 C 端的搜索引擎与缓存。 如何解决 B 端 C 端数据共享的问题?

解决方式:

  1. 同步方式:B 端和 C 端通过 RPC 或 WebService 的方式发布服务,让对方来调用,以获取对方的 信息。求职者每更新一次简历,就调用一次 B 端的服务,进行数据的同步;B 端企业用户每更 新职位需求,就调用 C 端的服务,进行数据的同步。
  2. 异步方式:使用消息队列,B 端将更新的数据发布到消息队列,C 端将更新的数据发布到消息 队列,B 端订阅 C 端的消息队列,C 端订阅 B 端的消息队列。

使用同步方式,B 端和 C 端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可 用。比如 C 端的 RPC 挂掉,企业用户有可能无法发布新的职位信息,因为发布了对方也看不到;B 端的 RPC 挂掉,求职者可能无法更新简历,因为即使简历更新了,对方也看不到。

可以让 B 端或 C 端在对方 RPC 挂掉的时候,先将该通知消息缓存起来,等对方服务恢复 之后再进行同步。

这正是引入异步方式,使用消息队列的目的。

使用消息队列的异步方式,对 B 端 C 端进行解耦,只要消息队列可用,双方都可以将需要同步的信息 发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓 存数据。


支付宝购买电影票

image.png

如上图,用户在支付宝购买了一张电影票后很快就收到消息推送和短信(电影院地址、几号厅、座 位号、场次时间等),同时用户会积累一定的会员积分。

这里,交易系统并不需要一直等待消息送达等动作都完成后才返回成功,允许一定延迟和瞬时不一 致(最终一致性),而且后面两个动作通常可以并发执行。

如果后期监控大盘想要获取实时交易数据,只需要新增个消费者程序并订阅该消息即可,交易系统 对此并不感知,松耦合。



第 3 节 JMS 规范和 AMQP 协议



3.1 JMS 经典模式详解

JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间 件(MOM,Message oriented Middleware)的 API,用于在两个应用程序之间,或分布式系统中发送 消息,进行异步通信。与具体平台无关的 API,绝大多数 MOM 提供商都支持。 它类似于 JDBC(Java Database Connectivity)。



3.1.1 JMS 消息

消息是 JMS 中的一种类型对象,由两部分组成:报文头和消息主体。

报文头包括消息头字段和消息头属性。

字段是 JMS 协议规定的字段,属性可以由用户按需添加。 JMS 报文头全部字段:

image.png

消息主体则携带着应用程序的数据或有效负载。

根据有效负载的类型来划分,可以将消息分为几种类型:

  1. 简单文本(TextMessage)
  2. 可序列化的对象(ObjectMessage)
  3. 属性集合(MapMessage)
  4. 字节流(BytesMessage)
  5. 原始值流(StreamMessage)
  6. 无有效负载的消息(Message)。


3.1.2 体系架构

JMS 由以下元素组成:

  1. JMS 供应商产品

JMS 接口的一个实现。该产品可以是 Java 的 JMS 实现,也可以是非 Java 的面向消息中间件的适 配器。

  1. JMS Client

生产或消费基于消息的 Java 的应用程序或对象。

  1. JMS Producer

创建并发送消息的 JMS 客户。

  1. JMS Consumer

接收消息的 JMS 客户。

  1. JMS Message

包括可以在 JMS 客户之间传递的数据的对象

  1. JMS Queue

缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列 中移除。

  1. JMS Topic

Pub/Sub 模式。



3.1.3 对象模型
  1. ConnectionFactory 接口(连接工厂)

用户用来创建到 JMS 提供者的连接的被管对象。JMS 客户通过可移植的接口访问连接,这样当 下层的实现改变时,代码不需要进行修改。管理员在 JNDI 名字空间中配置连接工厂,这样, JMS 客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接 工厂。

  1. Connection 接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与 JMS 提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题 到目标。

  1. Destination 接口(目标)

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是 队列,或者是主题。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一 样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

  1. Session 接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的, 就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事 务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用 户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者 来接收消息。

  1. MessageConsumer 接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻 塞)接收队列和主题类型的消息。

  1. MessageProducer 接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个 通用的发送者,在发送消息时指定目标。

  1. Message 接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个 消息有三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创 建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节 消息,流消息和对象消息)。

image.png



3.1.4 模式

Java 消息服务应用程序结构支持两种模式:

  1. 点对点也叫队列模式
  2. 发布/订阅模式


在点对点或队列模型

一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为:

一条消息只有一个消费者获得 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行 状态。 每一个成功处理的消息要么自动确认,要么由接收者手动确认。

image.png


发布/订阅模式

支持向一个特定的主题发布消息。 0 或多个订阅者可能对接收特定消息主题的消息感兴趣。 发布者和订阅者彼此不知道对方。 多个消费者可以获得消息

在发布者和订阅者之间存在时间依赖性。

发布者需要建立一个主题,以便客户能够订阅。 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。

image.png



3.1.5 传递方式

JMS 有两种传递消息的方式。

标记为 NON_PERSISTENT 的消息最多投递一次,而标记为 PERSISTENT 的消息将使用暂存后再转送 的机理投递。

如果一个 JMS 服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非 持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。



3.1.6 供应商

开源软件:

  1. Apache ActiveMQ
  2. RabbitMQ
  3. RocketMQ
  4. JBoss 社区所研发的 HornetQ
  5. Joram
  6. Coridan 的 MantaRay
  7. The OpenJMS Group 的 OpenJMS

专有的供应商包括:

  1. BEA 的 BEA WebLogic Server JMS
  2. TIBCO Software 的 EMS
  3. GigaSpaces Technologies 的 GigaSpaces
  4. Softwired 2006 的 iBus
  5. IONA Technologies 的 IONA JMS
  6. SeeBeyond 的 IQManager(2005 年 8 月被 Sun Microsystems 并购)
  7. webMethods 的 JMS±
  8. my-channels 的 Nirvana
  9. Sonic Software 的 SonicMQ
  10. SwiftMQ 的 SwiftMQ
  11. IBM 的 WebSphere MQ



3.2 JMS 在应用集群中的问题

生产中应用基本上都是以集群部署的。在 Queue 模式下,消息的消费没有什么问题,因为不同节点 的相同应用会抢占式地消费消息,这样还能分摊负载。

如果使用 Topic 广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操 作,这样就重复消费了。。。

image.png

image.png

方案一:选择 Queue 模式,创建多个一样的 Queue,每个应用消费自己的 Queue。

弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。

方案二:选择 Topic 模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。

弊端:对业务侵入较大,不是优雅的解决方法。

ActiveMQ 通过“虚拟主题”解决了这个问题。 生产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费(P2P),而不 同的应用都需要消费到全量的消息(Topic)模式。这样就可以避免重复消费。


JMS 规范文档 3.3 AMQP 协议剖析

(jms-1_1-fr-spec.pdf)下载地址:


https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/

JMS 是 JEE 平台的标准消息传递 API。它可以在商业和开源实现中使用。每个实现都包括一个 JMS 服 务器,一个 JMS 客户端库,以及用于管理消息传递系统的其他特定于实现的组件。 JMS 提供程序可以是 消息传递服务的独立实现,也可以是非 JMS 消息传递系统的桥梁。

JMS 客户端 API 是标准化的,因此 JMS 应用程序可在供应商的实现之间移植。但是:

  1. 底层消息传递实现未指定,因此 JMS 实现之间没有互操作性。除非存在桥接技术,否则想要共 享消息传递的 Java 应用程序必须全部使用相同的 JMS 实现。
  2. 如果没有供应商特定的 JMS 客户端库来启用互操作性,则非 Java 应用程序将无法访问 JMS。
  3. AMQP 0-9-1 是一种消息传递协议,而不是像 JMS 这样的 API。任何实现该协议的客户端都可以 访问支持 AMQP 0-9-1 的代理。
  4. 协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的 AMQP 0-9-1 客户端都 可以参与消息传递系统,而无需桥接不兼容的服务器实现。



3.3 AMQP 协议剖析



3.3.1 协议架构

AMQP 全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容 JMS 协议。目前 RabbitMQ 主流支持 AMQP 0-9-1,3.8.4 版本支持 AMQP 1.0。

image.png

image.png



3.3.2 AMQP 中的概念

Publisher:消息发送者,将消息发送到 Exchange 并指定 RoutingKey,以便 queue 可以接收到指 定的消息。

Consumer:消息消费者,从 queue 获取消息,一个 Consumer 可以订阅多个 queue 以从多个 queue 中接收消息。

Server:一个具体的 MQ 服务实例,也称为 Broker。

Virtual host:虚拟主机,一个 Server 下可以有多个虚拟主机,用于隔离不同项目,一个 Virtual host 通常包含多个 Exchange、Message Queue。

Exchange:交换器,接收 Producer 发送来的消息,把消息转发到对应的 Message Queue 中。

Routing key:路由键,用于指定消息路由规则(Exchange 将消息路由到具体的 queue 中),通 常需要和具体的 Exchange 类型、Binding 的 Routing key 结合起来使用。

Bindings:指定了 Exchange 和 Queue 之间的绑定关系。Exchange 根据消息的 Routing key 和 Binding 配置(绑定关系、Binding、Routing key 等)来决定把消息分派到哪些具体的 queue 中。这依 赖于 Exchange 类型。

Message Queue:实际存储消息的容器,并把消息传递给最终的 Consumer。



3.3.3.AMQP 传输层架构


简要概述

AMQP 是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信 息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。

我们假定有一个可靠的面向流的网络传输层(TCP/IP 或等价的协议)。

在一个单一的 socket 连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用 通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序 列传输。

我们使用小的数据类型来构造数据帧,如 bit,integer,string 以及字段表。数据帧的字段做了轻微 的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。

线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是 AMQP)。我们假定 AMQP 会扩展,改进以及随时间的其他变化,并要求 wire-level 格式支持这些变化。


数据类型

AMQP 使用的数据类型如下:

Integers(数值范围 1-8 的十进制数字):用于表示大小,数量,限制等,整数类型无符号 的,可以在帧内不对齐。 Bits(统一为 8 个字节):用于表示开/关值。

Short strings:用于保存简短的文本属性,字符串个数限制为 255,8 个字节

Long strings:用于保存二进制数据块。

Field tables:包含键值对,字段值一般为字符串,整数等。


协议协商

AMQP 客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选 项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一 个很有用的技术手段,因为它可以让我们断言假设和前置条件。

在 AMQP 中,我们需要协商协议的一些特殊方面:

1、 真实的协议和版本。服务器可能在同一个端口支持多个协议。

2、 双方的加密参数和认证方式。这是功能层的一部分。

3、 数据帧最大大小,通道数量以及其他操作限制。

对限制条件的认同可能会导致双方重新分配 key 的缓存,避免死锁。每个发来的数据帧要么遵守认 同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一 切工作正常,要么完全不工作”的 RabbitMQ 哲学。

协商双方认同限制到一个小的值,如下:

  1. 服务端必须告诉客户端它加上了什么限制。
  2. 客户端响应服务器,或许会要求对客户端的连接降低限制。


数据帧界定

TCP/IP 是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:

  1. 每个连接发送单一数据帧。简单但是慢。
  2. 在流中添加帧的边界。简单,但是解析很慢。
  3. 计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP 的选择。


3.3.4 AMQP 客户端实现 JMS 客户端

RabbitMQ 的 JMS 客户端用 Java 实现,既与 JMS API 兼容,也与 AMQP 0-9-1 协议兼 容。

局限性

RabbitMQ JMS 客户端不支持某些 JMS 1.1 功能:

JMS 客户端不支持服务器会话。

XA 事务支持接口未实现。

RabbitMQ JMS 主题选择器插件支持主题选择器。

队列选择器尚未实现。

支持 RabbitMQ 连接的 SSL 和套接字选项,但仅使用 RabbitMQ 客户端提供的(默认)SSL 连接 协议。

RabbitMQ 不支持 JMS NoLocal 订阅功能,该功能禁止消费者接收通过消费者自己的连接发布 的消息。可以调用包含 NoLocal 参数的方法,但该方法将被忽略。

RabbitMQ 使用 amqp 协议,JMS 规范仅对于 Java 的使用作出的规定,跟其他语言无关,协议是语言 无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。

AMQP 协议文档下载地址:


https://www.amqp.org/sites/amqp.org/files/amqp0-9-1.zip



第二部分:RabbitMQ



第 1 节 RabbitMQ 架构与实战



1.1 RabbitMQ 介绍、概念、基本架构



1.1.1 RabbitMQ 介绍

RabbitMQ,俗称“兔子 MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管 是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。

  1. 高可靠性、易扩展、高可用、功能丰富等
  2. 支持大多数(甚至冷门)的编程语言客户端。
  3. RabbitMQ 遵循 AMQP 协议,自身采用 Erlang(一种由爱立信开发的通用面向并发编程的语 言)编写。
  4. RabbitMQ 也支持 MQTT 等其他协议。

RabbitMQ 具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:


https://www.rabbitmq.com/community-plugins.html



1.1.2 RabbitMQ 整体逻辑架构

image.png



1.1.3 RabbitMQ Exchange 类型

RabbitMQ 常用的交换器类型有:fanout、direct、topic、headers 四种


Fanout

会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:

image.png


Direct

direct 类型的交换器路由规则很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的 队列中,如下图:

image.png


Topic

topic 类型的交换器在 direct 匹配规则上进行了扩展,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,这里的匹配规则稍微不同,它约定:

BindingKey 和 RoutingKey 一样都是由”.”分隔的字符串;BindingKey 中可以存在两种特殊字符“*

”和 “#”,用于模糊匹配,其中”*

“用于匹配一个单词,”#”用于匹配多个单词(可以是 0 个)。

image.png


Headers

headers 类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时, RabbitMQ 会获取到该消息的 headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键 值对,如果匹配,消息就会路由到该队列。headers 类型的交换器性能很差,不实用。



1.1.4 RabbitMQ 数据存储


存储机制

RabbitMQ 消息有两种类型:

  1. 持久化消息和非持久化消息。
  2. 这两种消息都会被写入磁盘。

持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清 除。这会提高一定的性能。

非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。

RabbitMQ 存储层包含两个部分:队列索引和消息存储。

image.png

队列索引:rabbit_queue_index

索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者 ack 等。

每个队列都有相对应的索引。

image.png

索引使用顺序的段文件来存储,后缀为.idx,文件名从 0 开始累加,每个段文件中包含固定的 segment_entry_count 条记录,默认值是 16384。每个 index 从磁盘中读取消息的时候,至少要在内存 中维护一个段文件,所以设置 queue_index_embed_msgs_below 值得时候要格外谨慎,一点点增大也 可能会引起内存爆炸式增长。

image.png

image.png

消息存储:rabbit_msg_store

消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一 个。

存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。

持久化存 储的内容在 broker 重启后不会丢失,短暂存储的内容在 broker 重启后丢失。

store 使用文件来存储,后缀为.rdq,经过 store 处理的所有消息都会以追加的方式写入到该文件 中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新 的消息写入。文件名从 0 开始进行累加。在进行消息的存储时,RabbitMQ 会在 ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。

image.png

image.png

消息(包括消息头、消息体、属性)可以直接存储在 index 中,也可以存储在 store 中。最佳的方式 是较小的消息存在 index 中,而较大的消息存在 store 中。这个消息大小的界定可以通过 queue_index_embed_msgs_below 来配置,默认值为 4096B。当一个消息小于设定的大小阈值时,就 可以存储在 index 中,这样性能上可以得到优化。一个完整的消息大小小于这个值,就放到索引中,否 则放到持久化消息文件中。

rabbitmq.conf 中的配置信息:

# queue_index_embed_msgs_below = 4096
# queue_index_embed_msgs_below = 4kb

如果消息小于这个值,就在索引中存储,如果消息大于这个值就在 store 中存储:

大于这个值的消息存储于 msg_store_persistent 目录中的 .rdq 文件中:

image.png

小于这个值的消息存储于 .idx 索引文件中:

image.png

读取消息时,先根据消息的 ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直 接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由 store 进行处 理。45

删除消息时,只是从 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。 在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记 为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效 数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有 3 个文件存在的情况下)的 数据大小的比值超过设置的阈值 garbage_fraction(默认值 0.5)时,才会触发垃圾回收,将这两个文件 合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:

  1. 锁定这两个文件
  2. 先整理前面的文件的有效数据,再整理后面的文件的有效数据
  3. 将后面文件的有效数据写入到前面的文件中
  4. 更新消息在 ETS 表中的记录
  5. 删除后面文件

image.png


队列结构

通常队列由 rabbit_amqqueue_process 和 backing_queue 这两部分组成

rabbit_amqqueue_process 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消 息、处理消息的确认(包括生产端的 confirm 和消费端的 ack)等。

backing_queue 是消息存储的具体形 式和引擎,并向 rabbit_amqqueue_process 提供相关的接口以供调用。

image.png

如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费 者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投 递。

rabbit_variable_queue.erl 源码中定义了 RabbitMQ 队列的 4 种状态:

  1. alpha:消息索引和消息内容都存内存,最耗内存,很少消耗 CPU
  2. beta:消息索引存内存,消息内存存磁盘
  3. gama:消息索引内存和磁盘都有,消息内容存磁盘
  4. delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多 CPU 和 I/O 操作

消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发 送变化。

持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种

gama 状态只有持久化消息才会有的状态。

在运行时,RabbitMQ 会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量 (target_ram_count),如果 alpha 状态的消息数量大于此值,则会引起消息的状态转换,多余的消息 可能会转换到 beta、gama 或者 delta 状态。区分这 4 种状态的主要作用是满足不同的内存和 CPU 需求。

对于普通没有设置优先级和镜像的队列来说,backing_queue 的默认实现是 rabbit_variable_queue,其内部通过 5 个子队列 Q1、Q2、delta、Q3、Q4 来体现消息的各个状态。

image.png

image.png

消费者获取消息也会引起消息的状态转换。

当消费者获取消息时

  1. 首先会从 Q4 中获取消息,如果获取成功则返回。
  2. 如果 Q4 为空,则尝试从 Q3 中获取消息,系统首先会判断 Q3 是否为空,如果为空则返回队列 为空,即此时队列中无消息。
  3. 如果 Q3 不为空,则取出 Q3 中的消息;进而再判断此时 Q3 和 Delta 中的长度,如果都为空,则 可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将 Q1 中的消息直接转移至 Q4,下次直接从 Q4 中获取消息。
  4. 如果 Q3 为空,Delta 不为空,则将 Delta 的消息转移至 Q3 中,下次可以直接从 Q3 中获取消息。 在将消息从 Delta 转移到 Q3 的过程中,是按照索引分段读取的,首先读取某一段,然后判断读 取的消息的个数与 Delta 中消息的个数是否相等,如果相等,则可以判定此时 Delta 中己无消 息,则直接将 Q2 和刚读取到的消息一并放入到 Q3 中,如果不相等,仅将此次读取到的消息转 移到 Q3。

这里就有两处疑问,第一个疑问是:为什么 Q3 为空则可以认定整个队列为空?

  1. 试想一下,如果 Q3 为空,Delta 不为空,那么在 Q3 取出最后一条消息的时候,Delta 上的消息 就会被转移到 Q3 这样与 Q3 为空矛盾;
  2. 如果 Delta 为空且 Q2 不为空,则在 Q3 取出最后一条消息时会将 Q2 的消息并入到 Q3 中,这样 也与 Q3 为空矛盾;
  3. 在 Q3 取出最后一条消息之后,如果 Q2、Delta、Q3 都为空,且 Q1 不为空时,则 Q1 的消息会 被转移到 Q4,这与 Q4 为空矛盾。

其实这一番论述也解释了另一个问题:为什么 Q3 和 Delta 都为空时,则可以认为 Q2、Delta、Q3、 Q4 全部为空?

通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可 能只会处于 alpha 状态。

对于持久化消息,它一定会进入 gamma 状态,在开启 publisher confirm 机制时,只有到了 gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续 走到下一个状态。

为什么消息的堆积导致性能下降?

在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加 处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息 的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继 而情况变得越来越恶化,使得系统的处理能力大大降低。

应对这一问题一般有 3 种措施:

  1. 增加 prefetch_count 的值,即一次发送多条消息给消费者,加快消息被消费的速度。
  2. 采用 multiple ack,降低处理 ack 带来的开销
  3. 流量控制



1.2 安装和配置 RabbitMQ

安装环境:

  1. 虚拟机软件:VMWare 15.1.0
  2. 操作系统:CentOS Linux release 7.7.1908
  3. Erlang:erlang-23.0.2-1.el7.x86_64
  4. RabbitMQ:rabbitmq-server-3.8.4-1.el7.noarch

RabbitMQ 的安装需要首先安装 Erlang,因为它是基于 Erlang 的 VM 运行的。

RabbitMQ 需要的依赖:socat 和 logrotate,logrotate 操作系统中已经存在了,只需要安装 socat 就 可以了。

RabbitMQ 与 Erlang 的兼容关系详见:

https://www.rabbitmq.com/which-erlang.html

image.png


安装配置启动

#关闭防火墙
#1、安装依赖:
yum install socat -y
#2、安装Erlang
#下载 
#https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_ 64.rpm
#安装
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
#3、安装RabbitMQ
#下载地址:
#https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3. 8.5-1.el7.noarch.rpm
#安装
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
#默认安装位置
/usr/lib/rabbitmq
#跳转到可执行文件位置
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/sbin
#开启UI插件
rabbitmq-plugins enable rabbitmq_management
#启动rabbitmq
systemctl start rabbitmq-server
#或
./rabbitmq-server
#或者后台启动
./rabbitmq-server -detached
#查看帮助
rabbitmqctl help
#查看用户
rabbitmqctl list_users
#添加用户
➜  / rabbitmqctl add_user root 1234
Adding user "root" ...
#设置标签(角色)
➜  / rabbitmqctl set_user_tags root administrator
Setting tags for user "root" to [administrator] ...
#设置权限 对于虚拟主机/赋予配置、读、写权限
➜  / rabbitmqctl set_permissions --vhost / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/" ...
#登陆rabbitmq
http://172.16.94.13:15672/


标签

Tag Capabilities
(None) 没有访问 management 插件的权限
management 可以使用消息协议做任何操作的权限,加上:1. 可以使用 AMQP 协议登录的虚拟主机的权限 2. 查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限 3. 查看和关闭它们自己的通道和连接的权限 4. 查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动
policymaker 所有 management 标签可以做的,加上: 1. 在它们能通过 AMQP 协议登录的虚拟主机上,查看、创建和删除策略以及虚 拟主机参数的权限
monitoring 所有 management 能做的,加上:1. 列出所有的虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限 2. 查看其他用户连接和通道的权限 3. 查看节点级别的数据如内存使用和集群的权限 4. 查看真正的全局所有虚拟主机统计数据的权限
administrator 所有 policymaker 和 monitoring 能做的,加上:1. 创建删除虚拟主机的权限 2. 查看、创建和删除用户的权限 3. 查看、创建和删除权限的权限 4. 关闭其他用户连接的权限


访问

http://172.16.94.13:15672/

image.png



1.3 RabbitMQ 常用操作命令

# 前台启动Erlang VM和RabbitMQ 
rabbitmq-server
# 后台启动 
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM 
rabbitmqctl stop
# 查看所有队列 
rabbitmqctl list_queues
# 查看所有虚拟主机 
rabbitmqctl list_vhosts
# 在Erlang VM运行的情况下启动RabbitMQ应用 
rabbitmqctl start_app rabbitmqctl stop_app
# 查看节点状态 
rabbitmqctl status
# 查看所有可用的插件 
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户 
rabbitmqctl add_user username password
# 列出所有用户: 
rabbitmqctl list_users
# 删除用户:
rabbitmqctl delete_user username
# 清除用户权限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限: 
rabbitmqctl list_user_permissions username
# 修改密码: 
rabbitmqctl change_password username newpassword
# 设置用户权限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机:
rabbitmqctl add_vhost vhostpath
# 列出所以虚拟主机: 
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限:
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机:
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用: 
rabbitmqctl reset



1.4 RabbitMQ 工作流程详解



1.4.1 生产者发送消息的流程
  1. 生产者连接 RabbitMQ,建立 TCP 连接( Connection),开启信道(Channel)
  2. 生产者声明一个 Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
  3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
  4. 生产者通过 bindingKey (绑定 Key)将交换器和队列绑定( binding )起来
  5. 生产者发送消息至 RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
  6. 相应的交换器根据接收到的 routingKey 查找相匹配的队列。
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道。
  10. 关闭连接


1.4.2 消费者接收消息的过程
  1. 消费者连接到 RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
  2. 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及 做一些准备工作
  3. 等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  4. 消费者确认( ack) 接收到的消息。
  5. RabbitMQ 从队列中删除相应己经被确认的消息。
  6. 关闭信道。
  7. 关闭连接。


1.4.3 案例

image.png

Hello World 一对一的简单模式。生产者直接发送消息给 RabbitMQ,另一端消费。未定义和指定 Exchange 的情况下,使用的是 AMQP default 这个内置的 Exchange。

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.galaxy</groupId>
    <artifactId>mq-demo-send-receive</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <target>11</target>
                    <source>11</source>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

生产者

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author lane
 * @date 2021年08月17日 下午6:46
 * Rabbitmq 是一个消息 broker:接收消息,传递给下游应用
 *
 * 术语:
 * Producing 就是指发送消息,发送消息的程序是 Producer
 * Queue 指的是 RabbitMQ 内部的一个组件,消息存储于 queue 中。queue 使用主机的内存和磁盘存

 储,收到内存和磁盘空间的限制

 * 可以想象为一个大的消息缓冲。很多 Producer 可以向同一个 queue 发送消息,很多消费者

 可以从同一个 queue 消费消息。

 * Consuming 就是接收消息。一个等待消费消息的应用程序称为 Consumer
 *
 * 生产者、消费者、队列不必在同一台主机,一般都是在不同的主机上的应用。一个应用可以同时是

 生产者和消费者。

 *

 */
public class HelloProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明消息队列
        //消息队列名称、是否持久化、是否排他、是否自动删除、属性信息
        channel.queueDeclare("queue.biz",false,false,true,null);

        String message = "hello world" ;
        //发送信息
        channel.basicPublish("", "queue.biz", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道
        channel.close();
        //关闭连接
        connection.close();

    }
}

消费者

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 * @author lane
 * @date 2021年08月17日 下午6:46
 * Rabbitmq 是一个消息 broker:接收消息,传递给下游应用
 *
 * 术语:
 * Producing 就是指发送消息,发送消息的程序是 Producer
 * Queue 指的是 RabbitMQ 内部的一个组件,消息存储于 queue 中。queue 使用主机的内存和磁盘存

 储,收到内存和磁盘空间的限制

 * 可以想象为一个大的消息缓冲。很多 Producer 可以向同一个 queue 发送消息,很多消费者

 可以从同一个 queue 消费消息。

 * Consuming 就是接收消息。一个等待消费消息的应用程序称为 Consumer
 *
 * 生产者、消费者、队列不必在同一台主机,一般都是在不同的主机上的应用。一个应用可以同时是

 生产者和消费者。

 *

 */
public class HelloConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname

        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");

        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明消息队列
        //消息队列名称、是否持久化、是否排他、是否自动删除、属性信息
        channel.queueDeclare("queue.biz",false,false,true,null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //声明交换器exchange
        //交换器名称、类型、是否持久化、是否自动删除、属性集合
        channel.exchangeDeclare("ex.biz", BuiltinExchangeType.DIRECT,false,false,null);
        //将交换器和消息队列绑定
        //消息队列名称、交换器名称、路由key
        channel.queueBind("queue.biz","ex.biz","hello.world");

        /* 使用服务器生成的consumerTag启动本地,非排他的使用者。
        启动一个 仅提供了basic.deliver和basic.cancel AMQP方法(对大多数情形够用了)
        第一个参数:队列名称 autoAck – true 只要服务器发送了消息就表示消息已经被消费者确认;
        false服务 端等待客户端显式地发送确认消息
        deliverCallback – 服务端推送过来的消息回调函数
        cancelCallback – 客户端忽略该消息的回调方法 Returns:
        服务端生成的consumerTag
        */
        channel.basicConsume("queue.biz", true,
                (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        }, consumerTag -> { });

        // 消息的推送回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //关闭通道
//        channel.close();
        //关闭连接
//        connection.close();


    }

}

效果

image.png

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hello world'
 [x] Received 'hello world'

注意

需要 rabbitMQ 运行且指定 hosts 中的 hostname 和 ip 对应



1.4.4 Connection 和 Channel 关系

生产者和消费者,需要与 RabbitMQ Broker 建立 TCP 连接,也就是 Connection 。一旦 TCP 连接建 立起来,客户端紧接着创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID。信道是 建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

image.png

为什么不直接使用 TCP 连接,而是使用信道?

RabbitMQ 采用类似 NIO 的做法,复用 TCP 连接,减少性能开销,便于管理。

当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。 当信道本身的流量很大时,一个 Connection 就会产生性能瓶颈,流量被限制。需要建立多个 Connection ,分摊信道。具体的调优看业务需要。

信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。

channel.exchangeDeclare 
channel.queueDeclare 
channel.basicPublish 
channel.basicConsume

RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。



1.5 RabbitMQ 工作模式代码实现

官网地址:

https://www.rabbitmq.com/getstarted.html



1.5.1 Work Queue

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的 效果。

image.png

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.galaxy</groupId>
    <artifactId>demo-work-queue</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <target>11</target>
                    <source>11</source>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

NewTask

package com.galaxy.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月18日 下午7:23
 */
public class NewTask {

    private static final String QUEUE_NAME = "";
    private static final String[] works = {

            "hello.", "hello..", "hello...", "hello....",
            "hello.....", "hello......", "hello.......",
            "hello........", "hello.........","hello.........."};

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明消息队列
        //消息队列名称、是否持久化、是否排他、是否自动删除、属性信息
        channel.queueDeclare("lane.task.queue",false,false,false,null);
        for (String work : works) {
            // 将消息路由到taskQueue队列
            channel.basicPublish("", "lane.task.queue", null, work.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + work + "'");
        }
    }

}

worker

package com.galaxy.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月18日 下午7:28
 */
public class Worker {
    private static final String TASK_QUEUE_NAME = "lane.task.queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明消息队列
        //消息队列名称、是否持久化、是否排他、是否自动删除、属性信息
        channel.queueDeclare("queue.biz",false,false,true,null);
        // true表示不需要手动确认消息,false表示需要手动确认消息: channel.basicAck(xxx, yyy);
        boolean autoAck = true;
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, (consumerTag, delivery) -> {
            String task = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + task + "'");
                    try {
                        doWork(task);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                },
                consumerTag -> {});
    }

    private static void doWork(String task) throws InterruptedException {
        System.out.println("task = " + task);
        for (char ch : task.toCharArray())
        { if (ch == '.')
            Thread.sleep(1000); } }
}

效果

 [x] Sent 'hello.'
 [x] Sent 'hello..'
 [x] Sent 'hello...'
 [x] Sent 'hello....'
 [x] Sent 'hello.....'
 [x] Sent 'hello......'
 [x] Sent 'hello.......'
 [x] Sent 'hello........'
 [x] Sent 'hello.........'
 [x] Sent 'hello..........'
·······················work1·····················
 [x] Received 'hello.'
task = hello.
 [x] Received 'hello...'
task = hello...
 [x] Received 'hello.....'
task = hello.....
 [x] Received 'hello.......'
task = hello.......
·······················work2·····················
 [x] Received 'hello..'
task = hello..
 [x] Received 'hello....'
task = hello....
 [x] Received 'hello......'
task = hello......
 [x] Received 'hello........'
task = hello........
 [x] Received 'hello..........'
task = hello..........



1.5.2 发布订阅(广播)模式

使用 fanout 类型交换器,routingKey 忽略。每个消费者定义生成一个队列并绑定到同一个 Exchange,每个消费者都可以消费到完整的消息。

消息广播给所有订阅该消息的消费者。

在 RabbitMQ 中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发 送到哪个队列。

生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换 器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还 是丢弃。规则就是交换器类型。

image.png

交换器的类型前面已经介绍过了:

direct、topic、headers、fanout 四种类型。

发布订阅使 fanout 类型的交换器

创建交换器,名字叫 lane.ex1 :

channel.exchangeDeclare("lane.ex1", "fanout");

fanout 交换器很简单,从名字就可以看出来(用风扇吹出去),将所有收到的消息发送给它知道的所有的队列。

rabbitmqctl list_exchanges

列出 RabbitMQ 的交换器,包括了 amq.* 的和默认的(未命名)的交换器。

image.png


未命名交换器


在前面的那里中我们没有指定交换器,但是依然可以向队列发送消息。这是因为我们使用了默认的 交换器。

//交换器的名字为空默认是direct类型的交换器
channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是交换器名称,为空字符串。直接使用 routingKey 向队列发送消息,如果该 routingKey 指定的队列存在的话。

现在,向指定的交换器发布消息:

channel.basicPublish("lane.ex1", "", null, message.getBytes());


临时队列

前面我们使用队列的名称,生产者和消费者都是用该名称来发送和接收该队列中的消息。

首先,我们无论何时连接 RabbitMQ 的时候,都需要一个新的,空的队列。我们可以使用随机的名 字创建队列,也可以让服务器帮我们生成随机的消息队列名字。 其次,一旦我们断开到消费者的连接,该队列应该自动删除。

String queueName = channel.queueDeclare().getQueue();

上述代码我们声明了一个非持久化的、排他的、自动删除的队列,并且名字是服务器随机生成的。 queueName 一般的格式类似: amq.gen-JzTY20BRgKO-HjmUJj0wLg 。


绑定

image.png

在创建了消息队列和 给该队列 fanout 类型的交换器之后,我们需要将两者进行绑定,让交换器将消息发送

channel.queueBind(queueName, "logs", "");

此时 logs 交换器会将接收到的消息追加到我们的队列中

可以使用下述命令列出 RabbitMQ 中交换器的绑定关系:

rabbitmqctl list_bindings

发布订阅模式的整体代码如下:

image.png

依赖同上

发送者 EmitLog

package com.galaxy.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午9:56
 */
public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明fanout类型的交换器:每个订阅者都发送一份
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //消息发送
        String message = "今天是星期几啊?";
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));

        System.out.println(" [x] Sent '" + message + "'");

    }

}

接收者 ReceiveLogs

package com.galaxy.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:06
 */
public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明fanout类型的交换器:每个订阅者都发送一份
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明一个临时队列,默认direct
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("临时队列名称:"+queueName);
        //绑定交换器和队列、路由key
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        //消息接收
        //队列名称
        //true表示不需要手动确认消息
        // deliverCallback – 服务端推送过来的消息回调函数
        // cancelCallback – 客户端忽略该消息的回调方法
        channel.basicConsume(queueName,true,
                (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'"); },
                consumerTag -> { }
                );

    }


}

运行效果

开启两个消费者,看下消息的接受如下

image.png

绑定信息

~ rabbitmqctl list_bindings --formatter  --pretty_tab  
Listing bindings for vhost /...
┌⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┐
│ source_name │ source_kind │ destination_name               │ destination_kind │ routing_key                    │ arguments │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│             │ exchange    │ amq.gen-MouZD2nDCRlJ7BZMXtzKzQ │ queue            │ amq.gen-MouZD2nDCRlJ7BZMXtzKzQ │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│             │ exchange    │ amq.gen-ZR212p4bv5iaWp2-H7ETGQ │ queue            │ amq.gen-ZR212p4bv5iaWp2-H7ETGQ │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ logs        │ exchange    │ amq.gen-MouZD2nDCRlJ7BZMXtzKzQ │ queue            │                                │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ logs        │ exchange    │ amq.gen-ZR212p4bv5iaWp2-H7ETGQ │ queue            │                                │           │
└⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┘

消息的推拉:

实现 RabbitMQ 的消费者有两种模式,推模式(Push)和拉模式(Pull)。 实现推模式推荐的方式 是继承 DefaultConsumer 基类,也可以使用 Spring AMQP 的 SimpleMessageListenerContainer 。 推 模式是最常用的,但是有些情况下推模式并不适用的,比如说: 由于某些限制,消费者在某个条件成立 时才能消费消息 需要批量拉取消息进行处理 实现拉模式 RabbitMQ 的 Channel 提供了 basicGet 方法用 于拉取消息。



1.5.3 路由模式

使用 direct 类型的 Exchange,发 N 条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange 绑定。此时使用 direct 模式 Exchagne 必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。

上一个模式中,可以将消息广播到很多接收者。

现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到 log 文件,同时在控制台正常打印所有的日志信息。


绑定

上一模式中,交换器的使用方式:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定语句中还有第三个参数:routingKey

channel.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey 的作用与具体使用的交换器类型有关。对于 fanout 类型的交换器,此参数设置无效,系统直接忽略



1.5.4 direct 交换器

分布式系统中有很多应用,这些应用需要运维平台的监控,其中一个重要的信息就是服务器的日志 记录。

我们需要将不同日志级别的日志记录交给不同的应用处理。

如何解决?

使用 direct 交换器

如果要对不同的消息做不同的处理,此时不能使用 fanout 类型的交换器,因为它只会盲目的广播消息。

我们需要使用 direct 类型的交换器。


direct 交换器的路由算法很简单:只要消息的 routingKey 和队列的 bindingKey 对应,消息就可以推送给该队列。

image.png

上图中的交换器 X 是 direct 类型的交换器,绑定的两个队列中,一个队列的 bindingKey 是 orange ,另一个队列的 bindingKey 是 black 和 green 。

如此,则 routingKey 是 orange 的消息发送给队列 Q1, routingKey 是 black 和 green 的消息发送给 Q2 队列,其他消息丢弃。


多重绑定

image.png

上图中,我们使用 direct 类型的交换器 X ,建立了两个绑定:队列 Q1 根据 bindingKey 的值 black 绑定到交换器 X ,队列 Q2 根据 bindingKey 的值 black 绑定到交换器 X ;交换器 X 会将消息发 送给队列 Q1 和队列 Q2。交换器的行为跟 fanout 的行为类似,也是广播。

我们将日志级别作为 routing key 进行代码演示

image.png

依赖同上

消息发送者 EmitLogsDirect

package com.galaxy.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:43
 */
public class EmitLogsDirect {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.direct.ex", BuiltinExchangeType.DIRECT);
        //消息发送
        String routingKey = "今天是星期几啊?";
        for (int i = 0; i <30 ; i++) {

            Math.random();
            Random random = new Random();
            int ran = random.nextInt(100);
            switch (ran%3){
                case 0 : routingKey = "info";break;
                case 1 : routingKey = "debug";break;
                case 2 : routingKey = "error";break;
            }
            String message =  "这是 【" + routingKey + "】 的消息,序号是:"+i;
            channel.basicPublish("lane.direct.ex",routingKey,null,message.getBytes("utf-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

info 消息的接收者 ReceiveInfoLogsDirect

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:56
 */
public class ReceiveInfoLogsDirect {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.direct.ex", BuiltinExchangeType.DIRECT);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定info路由的消息
        channel.queueBind(queueName,"lane.direct.ex","info");
        //接收消息的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        //接收消息
        channel.basicConsume(queueName, deliverCallback, consumerTag -> {});

    }


}

error 消息的接收者 ReceiveErrorLogsDirect

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:56
 */
public class ReceiveErrorLogsDirect {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.direct.ex", BuiltinExchangeType.DIRECT);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定error路由的消息
        channel.queueBind(queueName,"lane.direct.ex","error");
        //接收消息的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        //接收消息
        channel.basicConsume(queueName, deliverCallback, consumerTag -> {});

    }


}

debug 消息的接收者 ReceiveDebugLogsDirect

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:56
 */
public class ReceiveDebugLogsDirect {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.direct.ex", BuiltinExchangeType.DIRECT);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定debug路由的消息
        channel.queueBind(queueName,"lane.direct.ex","debug");
        //接收消息的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        //接收消息
        channel.basicConsume(queueName, deliverCallback, consumerTag -> {});

    }

}

运行效果

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VgWm1uTs-1631776584259)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210819111135-26zje2y.png)]

~ rabbitmqctl list_bindings --formatter  pretty_table
Listing bindings for vhost /...
┌⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┐
│ source_name    │ source_kind │ destination_name               │ destination_kind │ routing_key                    │ arguments │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│                │ exchange    │ amq.gen-mgf93OzaVodV3-0UojP3rA │ queue            │ amq.gen-mgf93OzaVodV3-0UojP3rA │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│                │ exchange    │ amq.gen-_n3Q6Z57HgWlINQJRa1inQ │ queue            │ amq.gen-_n3Q6Z57HgWlINQJRa1inQ │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│                │ exchange    │ amq.gen-JDfJApS3OJ82qCGVmuix9g │ queue            │ amq.gen-JDfJApS3OJ82qCGVmuix9g │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ lane.direct.ex │ exchange    │ amq.gen-_n3Q6Z57HgWlINQJRa1inQ │ queue            │ debug                          │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ lane.direct.ex │ exchange    │ amq.gen-JDfJApS3OJ82qCGVmuix9g │ queue            │ error                          │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ lane.direct.ex │ exchange    │ amq.gen-mgf93OzaVodV3-0UojP3rA │ queue            │ info                           │           │
└⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┘


1.5.5 主题模式

使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。

上个模式中,我们通过 direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

这里有一个限制,加入现在我不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎么做?

比如,我想监听 cron 服务发送的 error 消息,又想监听从 kern 服务发送的所有消息。

此时可以使用 RabbitMQ 的主题模式( Topic )。

要想 topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该点分单词字符串最长 255 字节。

bindingKey 也必须是这种形式。 topic 类型的交换器背后原理跟 direct 类型的类似:只要队列的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息。

模式匹配有两个不同的符号:


*

号匹配一个单词


#

号匹配多个单词

image.png

上图中,我们发送描述动物的消息。消息发送的时候指定的 routingKey 包含了三个词,两个点。

第一个单词表示动物的速度,第二个是颜色,第三个是物种:..。

创建三个绑定:

  1. Q1 关注 white 颜色动物的消息
  2. Q2 关注兔子的消息
  3. Q3 关注速度快且颜色为黑色的消息

如果不能匹配,就丢弃消息。

如果在 topic 类型的交换器中 bindingKey 使用 # ,则就是 fanout 类型交换器的行为。

如果在 topic 类型的交换器中 bindingKey 中不使用 * 和 # ,则就是 direct 类型交换器的行为。

依赖同上

消息发送者 EmitLogTopic

package com.galaxy.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午11:32
 */
public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "lane.topic.ex";
    private static final String[] SPEED = { "slow","normal","fast" };
    private static final String[] COLOR = { "white","black","colorful" };
    private static final String[] Animal = { "dog","cat","rabbit" };

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //消息发送
        String routingKey = "";
        for (int i = 0; i <30 ; i++) {
            Random random = new Random();
            int ran1 = random.nextInt(100);
            int num1 = ran1%3;
            String speed = SPEED[num1];
            ran1 = random.nextInt(100);
            num1 = ran1%3;
            String color = COLOR[num1];
            ran1 = random.nextInt(100);
            num1 = ran1%3;
            String animal = Animal[num1];
            routingKey=speed+"."+color+"."+animal;
            String message =  "这是 【" + routingKey + "】 的消息,序号是:"+i;
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消息接收者之白色的动物

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:56
 */
public class ReceiveWhiteLogsDirect {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.topic.ex", BuiltinExchangeType.TOPIC);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定白色的动物路由消息
        channel.queueBind(queueName,"lane.topic.ex","*.white.*");
        //接收消息的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        //接收消息
        channel.basicConsume(queueName,true, deliverCallback, consumerTag -> {});

    }


}

消息接受者之兔子 🐰

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:56
 */
public class ReceiveRabbitLogsDirect {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.topic.ex", BuiltinExchangeType.TOPIC);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定rabbit路由的消息
        channel.queueBind(queueName,"lane.topic.ex","#.rabbit");
        //接收消息的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        //接收消息
        channel.basicConsume(queueName,true, deliverCallback, consumerTag -> {});

    }


}

消息接收者之黑色的跑快快

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年08月19日 上午10:56
 */
public class ReceiveFastBlackLogsDirect {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机名称 hostname 需要在host文件中添加 ip hostname 如 172.16.94.13 mha
        connectionFactory.setHost("mha");
        //设置虚拟主机名称默认 / (在url中的转义字符 %2f)
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("1234");
        //新建tcp连接
        Connection connection = connectionFactory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //声明direct类型的交换器:根据路由发送
        channel.exchangeDeclare("lane.topic.ex", BuiltinExchangeType.TOPIC);
        //创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定速度快且为黑色的动物路由消息
        channel.queueBind(queueName,"lane.topic.ex","fast.black.*");
        //接收消息的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        //接收消息
        channel.basicConsume(queueName,true, deliverCallback, consumerTag -> {});



}

运行效果

image.png

~ rabbitmqctl list_bindings --formatter  pretty_table
Listing bindings for vhost /...
┌⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┬⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┐
│ source_name   │ source_kind │ destination_name               │ destination_kind │ routing_key                    │ arguments │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│               │ exchange    │ amq.gen-4z4Env9d7mpR4FOOkY2DEg │ queue            │ amq.gen-4z4Env9d7mpR4FOOkY2DEg │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│               │ exchange    │ amq.gen-mw4umzeXwld6jQdjNR9XLQ │ queue            │ amq.gen-mw4umzeXwld6jQdjNR9XLQ │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│               │ exchange    │ amq.gen-7qvlHvJJ4K_W1JEXS2tO_g │ queue            │ amq.gen-7qvlHvJJ4K_W1JEXS2tO_g │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ lane.topic.ex │ exchange    │ amq.gen-7qvlHvJJ4K_W1JEXS2tO_g │ queue            │ #.rabbit                       │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ lane.topic.ex │ exchange    │ amq.gen-4z4Env9d7mpR4FOOkY2DEg │ queue            │ *.white.*                      │           │
├⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┼⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┤
│ lane.topic.ex │ exchange    │ amq.gen-mw4umzeXwld6jQdjNR9XLQ │ queue            │ fast.black.*                   │           │
└⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┴⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻┘



1.6 Spring 整合 RabbitMQ

spring-amqp 是对 AMQP 的一些概念的一些抽象,spring-rabbit 是对 RabbitMQ 操作的封装实现。

主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等

RabbitAdmin 类完成对 Exchange,Queue,Binding 的操作,在容器中管理了 RabbitAdmin 类的时候,可以对 Exchange,Queue,Binding 进行自动声明。

RabbitTemplate 类是发送和接收消息的工具类。

SimpleMessageListenerContainer 是消费消息的容器。

目前比较新的一些项目都会选择基于注解方式,而比较老的一些项目可能还是基于配置文件的。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.galaxy</groupId>
    <artifactId>mq-demo-spring-anno</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.2.9.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

添加配置类

package com.galaxy.rabbitmq;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lane
 * @date 2021年08月21日 下午4:28
 */
@Configuration
public class RabbitConfiguration {

    @Bean
    public com.rabbitmq.client.ConnectionFactory rabbitFactory() {
        com.rabbitmq.client.ConnectionFactory rabbitFactory = new com.rabbitmq.client.ConnectionFactory();
        rabbitFactory.setHost("mha");
        rabbitFactory.setVirtualHost("/");
        rabbitFactory.setUsername("root");
        rabbitFactory.setPassword("1234");
        rabbitFactory.setPort(5672);
        return rabbitFactory;

    }

    @Bean
    public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitFactory) {
        ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitFactory);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory factory) {

        AmqpAdmin amqpAdmin = new RabbitAdmin(factory);

        return amqpAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        return rabbitTemplate;
    }

    @Bean
    public Queue queue() {

        Queue myqueue = new Queue("myqueue");

        return myqueue;
    }

}

添加测试类

package com.galaxy.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

/**
 * @author lane
 * @date 2021年08月21日 下午4:32
 */
public class SpringAnnotationDemo {
    public static void main(String[] args) {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
        AmqpTemplate template = context.getBean(AmqpTemplate.class);
       //发送消息
        template.convertAndSend("myqueue", "hello world");
	//接收消息
        String msg = (String) template.receiveAndConvert("myqueue");
        System.out.println(msg);

        context.close();

}

}

测试效果

/Library/Java/JavaVirtualMachines/jdk-11.0.10.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=50525:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/dulane/workspace/startspace/lane-rabbitMQ-61/mq-demo-spring-anno/target/classes:/Users/dulane/.m2/repository/org/springframework/amqp/spring-rabbit/2.2.9.RELEASE/spring-rabbit-2.2.9.RELEASE.jar:/Users/dulane/.m2/repository/com/rabbitmq/amqp-client/5.7.3/amqp-client-5.7.3.jar:/Users/dulane/.m2/repository/org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.jar:/Users/dulane/.m2/repository/org/springframework/amqp/spring-amqp/2.2.9.RELEASE/spring-amqp-2.2.9.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/retry/spring-retry/1.2.5.RELEASE/spring-retry-1.2.5.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-core/5.2.8.RELEASE/spring-core-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-jcl/5.2.8.RELEASE/spring-jcl-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-context/5.2.8.RELEASE/spring-context-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-aop/5.2.8.RELEASE/spring-aop-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-beans/5.2.8.RELEASE/spring-beans-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-expression/5.2.8.RELEASE/spring-expression-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-messaging/5.2.8.RELEASE/spring-messaging-5.2.8.RELEASE.jar:/Users/dulane/.m2/repository/org/springframework/spring-tx/5.2.8.RELEASE/spring-tx-5.2.8.RELEASE.jar com.galaxy.rabbitmq.SpringAnnotationDemo
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello world

Process finished with exit code 0



1.7 SpringBoot 整合 RabbitMQ

添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.galaxy</groupId>
    <artifactId>mq-demo-springboot-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq-demo-springboot-consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

添加连接信息 application.properties

spring.application.name=springboot_rabbitmq_consumer
#spring.rabbitmq.host=mha
spring.rabbitmq.host=172.16.94.13
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=1234
spring.rabbitmq.port=5672

启动类

package com.galaxy.mqdemospringbootproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MqDemoSpringbootProducerApplication {

    public static void main(String[] args) {
      
        SpringApplication.run(MqDemoSpringbootProducerApplication.class, args);
    }

}

RabbitConfig 类

package com.galaxy.mqdemospringbootproducer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lane
 * @date 2021年08月21日 下午4:47
 */
@Configuration
public class RabbitConfig {

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue123",false,false,false,null);
    }

    @Bean
    public Exchange myExchange() {
        //交换器名称,交换器类型(),是否是持久化的,是否自动删除,交换器属性Map集合
//        new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, false, false, null);
        return new DirectExchange("myex", false, false, null);
    }

    @Bean
    public Binding myBinding() {
        // 绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key, 绑定的属性
        // new Binding("", Binding.DestinationType.EXCHANGE, "", "",null);
        // 绑定了交换器direct.biz.ex到队列myqueue,路由key是 direct.biz.ex
        return new Binding("myqueue123", Binding.DestinationType.QUEUE, "myex", "mykey", null);
    }


}

controller

package com.galaxy.mqdemospringbootproducer.controller;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;

/**
 * @author lane
 * @date 2021年08月21日 下午4:53
 */

@RestController
@RequestMapping("/rabbitmq")
public class ProducerController {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/{message}")
    public String sendMsg(@PathVariable("message") String message) throws UnsupportedEncodingException {
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding("utf-8")
                .setHeader("hello", "world")
                .build();

        Message msg = MessageBuilder.withBody(message.getBytes("utf-8"))
                .andProperties(messageProperties)
                .build();
        rabbitTemplate.send("myex","mykey",msg);
        rabbitTemplate.convertAndSend("myex", "mykey", message);
        return "ok";
    }

    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable("message") String message) {

        rabbitTemplate.convertAndSend("myex", "mykey", "发送:"+message);

        return "send ok";

    }

}


添加消费者

新建 spring boot 项目,选择添加 rabbitmq

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.galaxy</groupId>
    <artifactId>mq-demo-springboot-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq-demo-springboot-consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置信息

spring.application.name=springboot_rabbitmq_consumer
#spring.rabbitmq.host=mha
spring.rabbitmq.host=172.16.94.13
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=1234
spring.rabbitmq.port=5672

配置类

package com.galaxy.mqdemospringbootconsumer.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.security.PublicKey;


/**
 * @author lane
 * @date 2021年08月26日 下午3:26
 */
@Configuration
public class ConsumerConfig {

     @Bean
    public Queue myQueue() {
        return new Queue("myqueue123",false,false,false,null);
    }

}

consumer

package com.galaxy.mqdemospringbootconsumer.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * @author lane
 * @date 2021年08月26日 下午2:38
 */
@Component
public class ConsumerListener {


    @RabbitListener(queues = "myqueue123")
    public void service2(@Payload String message , @Header(name = "hello") String name) {
        System.out.println("消息队列推送来的消息:" + message);
        System.out.println("消息队列推送来header hello:" + name);
    }

//    @RabbitListener(queues = "myqueue123")
//    public void service1(@Payload String message) {
//        System.out.println("消息队列推送来的消息:" + message);
//
//    }
}

测试

访问 http://localhost:8080/rabbitmq/abcdef

image.png



第 2 节 RabbitMQ 高级特性解析



2.1 消息可靠性

image.png

你用支付宝给商家支付,如果是个仔细的人,会考虑我转账的话,会不会把我的钱扣了,商家没有 收到我的钱?

一般我们使用支付宝或微信转账支付的时候,都是扫码,支付,然后立刻得到结果,说你支付了多 少钱,如果你绑定的是银行卡,可能这个时候你并没有收到支付的确认消息。往往是在一段时间之后, 你会收到银行卡发来的短信,告诉你支付的信息。

支付平台如何保证这笔帐不出问题?

image.png

支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。

支付平台通过如下几种方式保证数据一致性:


分布式锁

这个比较容易理解,就是在操作某条数据时先锁定,可以用 redis 或 zookeeper 等常用框架来 实现。 比如我们在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等 待上一个操作的锁释放后再依次执行。

优点:能够保证数据强一致性。

缺点:高并发场景下可能有性能问题。


消息队列

消息队列是为了保证最终一致性,我们需要确保消息队列有 ack 机制 客户端收到消 息并消费处理完成后,客户端发送 ack 消息给消息中间件 如果消息中间件超过指定时间还没收 到 ack 消息,则定时去重发消息。

比如我们在用户充值完成后,会发送充值消息给账户系统,账户系统再去更改账户余额。

优点:异步、高并发

缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成 功完成,不可能失败。

我们可以从以下几方面来保证消息的可靠性:

  1. 客户端代码中的异常捕获,包括生产者和消费者
  2. AMQP/RabbitMQ 的事务机制
  3. 发送端确认机制
  4. 消息持久化机制
  5. Broker 端的高可用集群
  6. 消费者确认机制
  7. 消费端限流
  8. 消息幂等性


2.11 异常捕获机制

先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过 try catch 方式捕获异常, 在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式, 并无法保证 100% 绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

image.png

另外,可以通过 spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试



2.12 AMQP/RabbitMQ 的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。

一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销 比较大,一般也不推荐使用。

image.png



2.13 发送端确认机制

RabbitMQ 后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。

生产者将信 道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上⾯面发布的消息都会被指派 一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么 确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。

image.png

RabbitMQ 回传给生产者的确认消息中的 deliveryTag 字段包含了确认消息的序号,另外,通过设 置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息是否都已经得到了处理 了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理 ACK 响 应。如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条 nack(Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理理该 nack 命令。


代码实现


package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年09月01日 上午11:53
 */
public class Producer {

    public static void main(String[] args) throws Exception, KeyManagementException, URISyntaxException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://root:1234@mha:5672/%2f");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //将当前通道设置为发送方确认通道
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.lane1",true,false,false,null);
        channel.exchangeDeclare("ex.lane1", BuiltinExchangeType.DIRECT,true,false,null);
        channel.queueBind("queue.lane1","ex.lane1","key.lane1");
        //发送消息
        channel.basicPublish("ex.lane1","key.lane1",null,"hello".getBytes("utf-8" ));
        //同步等待mq消息ack
        try {
            channel.waitForConfirmsOrDie(5_000);
            System.out.println("消息已经确认");

        }catch (IOException ioException){
            System.out.println("消息拒绝");
        }
        catch (IllegalStateException illegalStateException){
            System.out.println("消息的通道不是confirm");
        }
        catch (TimeoutException timeoutException){
            System.out.println("等待超时");
        }
        channel.close();
        connection.close();

    }
}

waitForConfirm 方法有个重载的,可以自定义 timeout 超时时间,超时后会抛 TimeoutException。类似的有几个 waitForConfirmsOrDie 方法,Broker 端在返回 nack(Basic.Nack)之 后该方法会抛出 java.io.IOException。需要根据异常类型来做区别处理理, TimeoutException 超时是 属于第三状态(无法确定成功还是失败),而返回 Basic.Nack 抛出 IOException 这种是明确的失败。上 面的代码主要只是演示 confirm 机制,实际上还是同步阻塞模式的,性能并不不是太好。

实际上,我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次 waitForConfirms 方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者 nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消 息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),如此看来,批量重发 消息肯定会造成部分消息重复。另外,我们可以通过异步回调的方式来处理 Broker 的响应。 addConfirmListener 方法可以添加 ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含 两个方法:handleAck 和 handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。


批量消息确认

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.util.concurrent.TimeoutException;

/**
 * @author lane
 * @date 2021年09月01日 上午11:53
 */
public class ProducerMulti {

    public static void main(String[] args) throws Exception, KeyManagementException, URISyntaxException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://root:1234@mha:5672/%2f");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //将当前通道设置为发送方确认通道
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        channel.queueDeclare("queue.lane1", true, false, false, null);
        channel.exchangeDeclare("ex.lane1", BuiltinExchangeType.DIRECT, true, false, null);
        channel.queueBind("queue.lane1", "ex.lane1", "key.lane1");
        int batchSize = 10;
        int waitMessageConfirms = 0;
        for (int i = 0; i < 107; i++) {
            //发送消息
            channel.basicPublish("ex.lane1", "key.lane1", null, ("hello" + i).getBytes("utf-8"));
            waitMessageConfirms++;
            if (waitMessageConfirms == batchSize) {
                channel.waitForConfirmsOrDie(5_000);
                System.out.println("消息已经确认");
                waitMessageConfirms = 0;
            }

        }
        if (waitMessageConfirms > 0) {
            channel.waitForConfirmsOrDie(5_000);
            System.out.println("剩余消息已经确认");

        }

        channel.close();
        connection.close();
    }
}


springboot 案例

不想写了,直接粘贴上来

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png



2.14 持久化存储机制

持久化是提高 RabbitMQ 可靠性的基础,否则当 RabbitMQ 遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

  1. Exchange 的持久化。通过定义时设置 durable 参数为 ture 来保证 Exchange 相关的元数据不不丢失。
  2. Queue 的持久化。也是通过定义时设置 durable 参数为 ture 来保证 Queue 相关的元数据不不 丢失。
  3. 消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。

RabbitMQ 中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处 理理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:

  1. 队列索引(rabbit_queue_index),rabbit_queue_index 负责维护 Queue 中消息的信息,包括 消息的存储位置、是否已交给消费者、是否已被消费及 Ack 确认等,每个 Queue 都有与之对应 的 rabbit_queue_index。
  2. 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列 共享,在每个节点中有且只有一个。

下图中 msg_stores/vhosts/$VHostId 这个路路径下包含 queues、msg_store_persistent、 msg_store_transient 这 3 个目录,这是实际存储消息的位置。其中 queues 目录中保存着 rabbit_queue_index 相关的数据,而 msg_store_persistent 保存着持久化消息数据, msg_store_transient 保存着非持久化相关的数据。

另外,RabbitMQ 通过配置 queue_index_embed_msgs_below 可以根据消息大小决定存储位置, 默认 queue_index_embed_msgs_below 是 4096 字节(包含消息体、属性及 headers),小于该值的消息 存在 rabbit_queue_index 中。

image.png


代码实现

package com.galaxy.rabbitmq;

import com.rabbitmq.client.*;
import com.sun.tools.javac.Main;

import java.net.URISyntaxException;
import java.security.KeyManagementException;

/**
 * @author lane
 * @date 2021年09月01日 下午4:03
 */
public class PersistentProducer {

    public static void main(String[] args) throws Exception, KeyManagementException, URISyntaxException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://root:1234@mha:5672/%2f");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //将当前通道设置为发送方确认通道、
        AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
        //参数分别为queue、持久化、排外、自删除、参数
        channel.queueDeclare("queue.lane2", true, false, false, null);
        //参数分别为exchange、类型、持久化、自删除、参数
        channel.exchangeDeclare("ex.lane2", BuiltinExchangeType.DIRECT, true, false, null);
        channel.queueBind("queue.lane2", "ex.lane2", "key.lane2");
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)//2代表持久化消息
                .build();

        channel.basicPublish("ex.lane2","key.lane2",properties,"hello".getBytes());
        channel.close();
        connection.close();
    }
}


2.15 Consumer ACK

如何保证消息被消费者成功消费? 前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的 可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我 们又没有任何重试,那结果跟消息丢失没什么分别。

RabbitMQ 在消费端会有 Ack 机制,即消费端消费消息后需要发送 Ack 确认报文给 Broker 端,告知自 己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO 模式)。

这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。

一般而言,我们有如下处理手段:

  1. 采用 NONE 模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表, 再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风 险
  2. 采用 AUTO(自动 Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue 中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新 被消费,默认会一直重发消息并直到消费完成返回 Ack 或者一直到过期
  3. 采用 MANUAL(手动 Ack)模式,消费者自行控制流程并手动调用 channel 相关的方法返回 Ack
/* NONE 模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO 模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL 模式,需要显式的调用当前 channel 的 basicAck 方法
* @param channel
* @param deliveryTag
* @param message

*/

@RabbitListener(queues = "la.topic.queue", ackMode = "AUTO")

public void handleMessageTopic(Channel channel,

@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload byte[]message) {

System.out.println("RabbitListener 消费消息,消息内容:" + new String((message)));

try {

// 手动 ack,deliveryTag 表示消息的唯一标志,multiple 表示是否是批量确认

channel.basicAck(deliveryTag, false);

// 手动 nack,告诉 broker 消费者处理失败,最后一个参数表示是否需要将消息重新入列

channel.basicNack(deliveryTag, false, true);

// 手动拒绝消息。第二个参数表示是否重新入列

channel.basicReject(deliveryTag, true);

} catch (IOException e) {

e.printStackTrace();

}

}

上面是通过在消费端直接配置指定 ackMode,在一些比较老的 spring 项目中一般是通过 xml 方式去 定义、声明和配置的,不管是 XML 还是注解,相关配置、属性这些其实都是大同小异,触类旁通。然后 需要注意的是 channel.basicAck 这几个手工 Ack 确认的方法。

image.png

SpringBoot 项目中支持如下的一些配置:

#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时) 
spring.rabbitmq.listener.simple.retry.enabled=true
#重试间隔时间(单位毫秒) 
spring.rabbitmq.listener.simple.retry.initial-interval=5000
# 重试超过最大次数后是否拒绝 
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#ack模式 
spring.rabbitmq.listener.simple.acknowledge-mode=manual

本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式

image.png


springboot 完整案例

image.png

image.png

image.png

image.png

image.png

image.png



2.16 消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如 何削峰?

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一 定的缓冲能力的, 但这个能力是有容量限制的, 如果长期运行并没有任何处理, 连锁反应那就会很悲剧… 最终会导致 Broker 崩溃

下面我将从多个角度介绍 QoS 与限流,防止上面的悲剧发生。


RabbitMQ 可以对内存和磁盘使用量设置阈值

当达到阈值后,生产者将被阻塞(block),直 到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的 Broker 被压垮。当内 存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已 连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在 rabbitmqctl 和 管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着 它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。

在/etc/rabbitmq/rabbitmq.conf 中配置磁盘可用空间大小:

image.png

image.png


基于 credit flow 的流控机制

面向每一个连接进行流控。当单 个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可 能是因为 connection、channel、queue 的某一个过程处于 flow 状态,这些状态都可以从监控 平台看到。

image.png

image.png

image.png

image.png


QoS 保证机制

RabbitMQ 中有一种 QoS 保证机制,可以限制 Channel 上接收到的未被 Ack 的消息数量,如果 超过这个数量限制 RabbitMQ 将不会再往消费端推送消息。这是一种流控手段,可以防止大量 消息瞬时从 Broker 送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是 QoS 机制仅对于消费端推模式有效,对拉模式无效。而且不支持 NONE Ack 模式。

执行 channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。消息的发 送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置 Qos 的 prefetchCount,它表示 broker 在向消费者发送消息的时候,一旦发送了 prefetchCount 个消 息而没有一个消息确认的时候,就停止发送。

消费者确认一个,broker 就发送一个,确认两 个就发送两个

。换句话说,消费者确认多少,broker 就发送多少,消费者等待处理的个数永 远限制在 prefetchCount 个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了 multiple 为 true,消费者在确认的时候,比如说 id 是 8 的消息确认了,则在 8 之前的所有消息都 确认了。

提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间(需要时间)
  2. 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
  3. 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)

image.png

image.png



2.17 消息可靠性保障

在讲高级特性的时候几乎已经都涉及到了,这里简单回顾总结下:

  1. 消息传输保障
  2. 各种限流、应急手段
  3. 业务层面的一些容错、补偿、异常重试等手段

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障 分为三个层级

  1. At most once:最多一次。消息可能会丢失,但绝不会重复传输
  2. At least once:最少一次。消息绝不会丢失,但可能会重复传输
  3. Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次

RabbitMQ 支持其中的“最多一次”和“最少一次”。

其中“最少一次”投递实现需要考虑以下这个几个方面的内容:

  1. 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队 列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息 丢失。
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经 正确消费的消息,以避免在消费端引起不必要的消息丢失。

“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确 保消息不会丢失。(估计有不少公司的业务系统都是这样的,想想都觉得可怕)

“恰好一次”是 RabbitMQ 目前无法保障的。

考虑这样一种情况,消费者在消费完一条消息之后向 RabbitMQ 发送确认 Basic.Ack 命令,此时由 于网络断开或者其他原因造成 RabbitMQ 并没有收到这个确认命令,那么 RabbitMQ 不会将此条消息标 记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。

再考虑一种情况,生产者在使用 publisher confirm 机制的时候,发送完一条消息等待 RabbitMQ 返 回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样的消息,在消费的时候消费者就会重复消费。



2.18 消息幂等性处理

刚刚我们讲到,追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而 导致重复消费…真是应证了那句老话:做架构就是权衡取舍。

RabbitMQ 层面有实现“去重机制”来保证“恰好一次”吗?答案是并没有。而且这个在目前主流的消息 中间件都没有实现。

借用淘宝沈洵的一句话:最好的解决办法就是不去解决。当为了在基础的分布式中间件中实现某种 相对不太通用的功能,需要牺牲到性能、可靠性、扩展性时,并且会额外增加很多复杂度,最简单的办 法就是交给业务自己去处理。事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收 集,而对一些金融类的业务则要求比较严苛。

一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。

幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC 框架调用超时后会重试,HTTP 请求会重复发起(用户手抖多点了几下按钮) 幂等(Idempotence)是一个数学上的概念,它是这样定义的:

**如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。**这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方 法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。

对于幂等的方法,不用担心重复执行会对系统造成任何改变。

举个简单的例子(在不考虑并发问题的情况下):

select * from xx where id=1 
delete from xx where id=1

这两条 sql 语句就是天然幂等的,它本身的重复执行并不会引起什么改变。而 update 就要看情况 的,

update xxx set amount = 100 where id =1

这条语句执行 1 次和 100 次都是一样的结果(最终余额都还是 100),所以它是满足幂等性的。

update xxx set amount = amount + 100 where id =1

它就不满足幂等性的。

业界对于幂等性的一些常见做法:

  1. 借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不 重复扣款或者重复加钱,我们这边维护一张“资金变动流水表”,里面至少需要交易单号、变动 账户、变动金额等 3 个字段。我们选择交易单号和变动账户做联合唯一索引(单号是上游生成 的可保证唯一性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回 滚。现实中,数据库唯一索引的方式通常做为兜底保证;
  2. 前置检查机制。这个很容易理解,并且有几种实现办法。还是引用上面转账的例子,当我在执 行更改账户余额这个动作之前,我得先检查下资金变动流水表(或者 Tair 中)中是否已经存在 这笔交易相关的记录了, select * from xxx where accountNumber=xxx and orderId=yyy ,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。为了防止 并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操 作。当然,我们也可以使用乐观锁或 CAS 机制,乐观锁一般会使用扩展一个版本号字段做判断 条件
  3. 唯一 Id 机制,比较通用的方式。对于每条消息我们都可以生成唯一 Id,消费前判断 Tair 中是否 存在(MsgId 做 Tair 排他锁的 key),消费成功后将状态写入 Tair 中,这样就可以防止重复消费 了。

对于接口请求类的幂等性保证要相对更复杂,我们通常要求上游请求时传递一个类 GUID 的请求号 (或 TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请 求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就 变得没意义了)则不继续往下执行,直接返回“重复请求”的提示和上次的处理结果(上游通常是由于请 求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求 ID 都不存在或 者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上 游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求 ID。同样也需要利用这个请求 ID 做分布式锁的 KEY 实现排他。



2.2 可靠性分析

在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的 机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。


Firehose

在 RabbitMQ 中可以使用 Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费 消息的记录,方便 RabbitMQ 的使用者进行调试、排错等。

Firehose 的原理是将生产者投递给 RabbitMQ 的消息,或者 RabbitMQ 投递给消费者的消息按照指 定的格式发送到默认的交换器上。这个默认的交换器的名称为 amq.rabbitmq.trace ,它是一个 topic 类型的交换器。发送到这个交换器上的消息的路由键为 publish.{exchangename} 和 deliver. {queuename} 。其中 exchangename 和 queuename 为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

开启 Firehose 命令:

rabbitmqctl trace_on [-p vhost]

其中[-p vhost]是可选参数,用来指定虚拟主机 vhost。

对应的关闭命令为:

rabbitmqctl trace_off [-p vhost]

默认情况下处于关闭状态,并且 Firehose 的状态是非持久化的,会在 RabbitMQ 服务重启 的时候还原成默认的状态。Firehose 开启之后多少会影响 RabbitMQ 整体服务性能,因为它会引起额 外的消息生成、路由和存储。

image.png

image.png


rabbitmq_tracing

rabbitmq_tracing 插件相当于 Firehose 的 GUI 版本,它同样能跟踪 RabbitMQ 中消息的流入流出 情况。rabbitmq_tracing 插件同样会对流入流出的消息进行封装,然后将封装后的消息日志存入相应的 trace 文件中。

可以使用如下命令来启动 rabbitmq_ tracing 插件

rabbitmq-plugins enable rabbitmq_tracing

关闭该插件

rabbitmq-plugins disable rabbitmq_tracing

image.png

Name 表示 rabbitmq_tracing 的一个条目的名称,Format 可以选择 Text 或 JSON,连接的用户名写 root,密码写 123456。

Pattern:发布的消息:publish.

Pattern:消费的消息:deliver.

image.png



2.3 TTL 机制

image.png

在京东下单,订单创建成功,等待支付,一般会给 30 分钟的时间,开始倒计时。如果在这段时间内 用户没有支付,则默认订单取消。

该如何实现?


定期轮询(数据库等)

用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更 改数据库状态。定期轮询数据库支付状态,如果超过 30 分钟就将该订单取消。

优点:设计实现简单

缺点:需要对数据库进行大量的 IO 操作,效率低下。


Timer

image.png

缺点:

Timers 没有持久化机制.

Timers 不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)

Timers 不能利用线程池,一个 timer 一个线程

Timers 没有真正的管理计划


ScheduledExecutorService

image.png

优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。

在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。

RabbitMQ 使用 TTL

Quartz

Redis Zset

JCronTab

SchedulerX

。。。

TTL,Time to Live 的简称,即过期时间。 RabbitMQ 可以对消息和队列两个维度来设置 TTL。

任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种 过期的机制来做兜底。

目前有两种方法可以设置消息的 TTL。

  1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息自身进行单独设置,每条消息的 TTL 可以不同。

如果两种方法一起使用,则消息的 TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存 时间一旦超过设置的 TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当 然,“死信”也是可以被取出来消费的,下一小节我们会讲解。

image.png

此外,还可以通过命令行方式设置全局 TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

  1. 如果不设置 TTL,则表示此消息不会过期;
  2. 如果 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢 弃;

一般 TTL 相关的参数单位都是毫秒(ms)


SpringBoot 代码实现

创建 springboot 项目名称 mq-demo-springboot-ttl

image.png

配置文件

spring.application.name=ttl
spring.rabbitmq.host=mha
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=1234
spring.rabbitmq.port=5672

启动类 SpringBootTTLApplication

package com.galaxy.config;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author lane
 * @date 2021年09月02日 下午3:30
 */
@SpringBootApplication
public class SpringBootTTLApplication {

    public static void main(String[] args) {

        SpringApplication.run(SpringBootTTLApplication.class,args);

    }
}

配置类

package com.galaxy.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lane
 * @date 2021年09月02日 下午3:29
 */
@Configuration
public class RabbitConfig {

    private Queue queueTTL;
    private Exchange exchangeTTL;
    private Queue queueWait;
    private Exchange exchangeWait;

    @Bean
    public Queue queueTTLWaiting() {

        Map<String, Object> props = new HashMap<>();

        // 对于该队列中的消息,设置都等待 10s
        props.put("x-message-ttl", 10000);

        Queue queue = new Queue("q.pay.ttl-waiting", true, false, false, props);
        queueTTL = queue;

        return queue;

    }
    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", true, false, false);
        queueWait = queue;
        return queue;

    }

    @Bean public Exchange exchangeTTLWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.ttl-waiting", true, false);
        exchangeTTL = exchange;
        return exchange; }

    /** * 该交换器使用的时候,需要给每个消息设置有效期 * @return */
    @Bean
    public Exchange exchangeWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.waiting", true, false);
        exchangeWait = exchange;
        return exchange; }

    @Bean
    public Binding bindingTTLWaiting() {
        return BindingBuilder
                .bind(queueTTL)
                .to(exchangeTTL)
                .with("pay.ttl-waiting").noargs(); }

    @Bean
    public Binding bindingWaiting() {
        return BindingBuilder
                .bind(queueWait)
                .to(exchangeWait)
                .with("pay.waiting").noargs(); }



}

访问入口

package com.galaxy.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;

/**
 * @author lane
 * @date 2021年09月02日 下午4:10
 */
@RestController
@RequestMapping("/pay")
public class PayController {

    @Autowired
    AmqpTemplate rabbitTemplate;

    @RequestMapping("/queuettl")
    public String sendMessage() {

        rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttl-waiting", "发送了TTL-WAITING-MESSAGE");

        return "queue-ttl-ok";

    }
    @RequestMapping("/msgttl")
    public String sendTTLMessage() throws UnsupportedEncodingException {

        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");

        Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);

        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        return "msg-ttl-ok"; }

}

原先的 consumer

--------config---------------

/**
 * @author lane
 * @date 2021年08月26日 下午3:26
 */
@Configuration
public class ConsumerConfig {


    @Bean
    public Queue queueTTLWaiting() {

        Map<String, Object> props = new HashMap<>();

        // 对于该队列中的消息,设置都等待 10s
        props.put("x-message-ttl", 10000);

        Queue queue = new Queue("q.pay.ttl-waiting", true, false, false, props);


        return queue;

    }
    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", true, false, false);

        return queue;

    }
}
--------listener----
package com.galaxy.mqdemospringbootconsumer.consumer;


/**
 * @author lane
 * @date 2021年08月26日 下午2:38
 */
@Component
public class ConsumerListener {

//    @RabbitListener(queues = "myqueue2")
//    public void service(@Payload String message , @Header(name = "hello") String name) {
//        System.out.println("消息队列推送来的消息:" + message);
//        System.out.println("消息队列推送来header hello:" + name);
//    }

    @RabbitListener(queues = "myqueue")
    public void service(@Payload String message) {
        System.out.println("消息队列推送来的消息:" + message);

    }
    @RabbitListener(queues = "q.pay.ttl-waiting")
    public void service123(@Payload String message) {
        System.out.println("消息队列推送来的消息123:" + message);

    }@RabbitListener(queues = "q.pay.waiting")
    public void service1(@Payload String message) {
        System.out.println("消息队列推送来的消息123:" + message);

    }
}

测试效果

如果接收消息

image.png

如果不接收消息

image.png

到时间之后,立刻消失掉了

image.png



2.4 死信队列

image.png

用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单 系统 采用 MQ 异步通讯。 在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消 息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。

DLX,全称为 Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter) 之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定 DLX 的队列就称为“死信队列”。

以下几种情况导致消息变为死信:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false;
  2. 消息过期;
  3. 队列达到最大长度。

对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被 消费者正确消费(消费者调用了 Basic.Nack 或者 Basic.Reject)而被置入死信队列中的情况,后 续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善 和优化系统。

image.png


springboot 案例实现

image.png

spring.application.name=springboot_rabbitmq_consumer
spring.rabbitmq.host=mha
#spring.rabbitmq.host=172.16.94.13
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=1234
spring.rabbitmq.port=5672

主入口类

package com.galaxy.mqdemospringbootproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MqDemoSpringbootProducerApplication {

    public static void main(String[] args) {

        SpringApplication.run(MqDemoSpringbootProducerApplication.class, args);
    }

}

image.png

image.png

image.png



2.5 延迟队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消 费。

例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留 15 分钟时 间,如果 15 分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?

  1. 可以用定时任务每分钟扫一次,发现有占座超过 15 分钟还没付款的就释放掉。但是这样做很 低效,很多时候做的都是些无用功;
  2. 可以用分布式锁、分布式缓存的被动过期时间,15 分钟过期后锁也释放了,缓存 key 也不存在 了;
  3. 还可以用延迟队列,锁座成功后会发送 1 条延迟消息,这条消息 15 分钟后才会被消费,消费的 过程就是检查这个座位是否已经是“已付款”状态;

你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚 22 点准时参加会有。系统 还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚 21:45 分的时候(提前 15 分钟)就 会通知提醒参会人员做好参会准备,会议马上开始…

同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会 议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比 较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾 的是,在 AMQP 协议和 RabbitMQ 中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍 的“死信队列”来变相的实现。

可以使用 rabbitmq_delayed_message_exchange 插件实现。

这里和 TTL 方式有个很大的不同就是 TTL 存放消息在死信队列(delayqueue)里,二基于插件存放消息 在延时交换机里(x-delayed-message exchange)。

image.png

  1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
  2. 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列 (queue)并把消息给它
  3. 队列(queue)再把消息发送给监听它的消费者(customer)
  4. 下载插件 下载地址:

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

image.png

  1. 安装插件 将插件拷贝到 rabbitmq-server 的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
  2. 启用插件
rabbitmq-plugins list 
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  1. 重启 rabbitmq-server
systemctl restart rabbitmq-server
  1. 编写代码

    image.png

    spring.application.name=springboot_rabbitmq_consumer
    spring.rabbitmq.host=mha
    #spring.rabbitmq.host=172.16.94.13
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.username=root
    spring.rabbitmq.password=1234
    spring.rabbitmq.port=5672
    
    

    SpringBootApplication 主入口类

image.png

RabbitMQ 的对象配置

image.png

image.png

image.png

  1. 结果:按照时长倒序发送请求,结果时间先到的先消费。



第 3 节 RabbitMQ 集群与运维



3.1 集群方案原理

对于无状态应用(如普通的微服务)很容易实现负载均衡、高可用集群。而对于有状态的系统(如 数据库等)就比较复杂。


业界实践

主备模式:单活,容量对等,可以实现故障转移。使用独立存储时需要借助复制、镜像同步等技 数据会有延迟、不一致等问题(CAP 定律),使用共享存储时就不会有状态同步这个问题。

主从模式:一定程度的双活, 容量对等, 最常见的是读写分离。 通常也需要借助复制技术, 或者要 求上游实现双写来保证节点数据一致。

主主模式:两边都可以读写,互为主备。如果两边同时写入很容易冲突,所以通常实现的都是“伪 主主模式”,或者说就是主从模式的升级版,只是新增了主从节点的选举和切换。

分片集群:不同节点保存不同的数据, 上游应用或者代理节点做路由, 突破存储容量限制, 分摊读写负载;典型的如 MongoDB 的分片、MySQL 的分库分表、Redis 集群。

异地多活:“两地三中心”是金融行业经典的容灾模式(有资源闲置的问题),“异地多活”才是王道


常用负载均衡算法

  1. 随机
  2. 轮询
  3. 加权轮询
  4. 最少活跃连接
  5. 原地址/目标地址 hash(一致性 hash)


集群中的经典问题

脑裂(可以通过协调器选举算法、仲裁节点等方式来解决)

网络分区、一致性、可用性(CAP)

相关景点的技术和工具:LVS、HAProxy、Nginx、KeepAlived、Heartbeat、DRBD、Corosync、 Pacemaker、MMM/MHA、Galera、MGR 等,感兴趣的同学可以研究。

现在太多公司选择直接购买公有云服务,基本不用太关心很多基础设施和中间件的部署、运维细 节。但是这些技术以及背后的原理是非常重要的。


RabbitMQ 分布式架构模式

主备模式

也叫 Warren(兔子窝)模式,同一时刻只有一个节点在工作(备份节点不能读写),当主节点发 生故障后会将请求切换到备份节点上(主恢复后成为备份节点)。需要借助 HAProxy 之类的(VIP 模 式)负载均衡器来做健康检查和主备切换,底层需要借助共享存储(如 SAN 设备)。

这不是 RabbitMQ 官方或者开源社区推荐方案,适用于访问压力不是特别大但是又有高可用架构需 求(故障切换)的中小规模的系统来使用。首先有一个节点闲置,本身就是资源浪费,其次共享存储往 往需要借助硬件存储,或者分布式文件系统。

image.png


Shovel 铲子模式

Shovel 是一个插件,用于实现跨机房数据复制,或者数据迁移,故障转移与恢复等。

如下图,用户下单的消费先是投递在 Goleta Broker 实例中,当 Goleta 实例达到触发条件后(例 如:消息堆积数达到阈值)会将消息放到 Goleta 实例的 backup_orders 备份队列中,并通过 Shovel 插件 从 Goleta 的 backup_orders 队列中将消息拉取到 Carpinteria 实例存储。

image.png

image.png

使用 Shovel 插件后,模型变成了近端同步确认,远端异步确认的方式。

此模式支持 WAN 传输,并且 broker 实例的 RabbitMQ、Erlang 版本不要求完全一致。

Shovel 的配置分静态模式(修改 RabbitMQ 配置)和动态模式(在控制台直接部署,重启后失效)


RabbitMQ 集群

RabbitMQ 集群允许消费者和生产者在 RabbitMQ 单个节点崩溃的情况下继续运行,并可以通过添 加更多的节点来线性扩展消息通信的吞吐量。当失去一个 RabbitMQ 节点时,客户端能够重新连接到集 群中的任何其他节点并继续生产和消费。

RabbitMQ 集群中的所有节点都会备份所有的元数据信息,包括:

  1. 队列元数据:队列的名称及属性;
  2. 交换器:交换器的名称及属性;
  3. 绑定关系元数据:交换器与队列或者交换器与交换器之间的绑定关系;
  4. vhost 元数据:为 vhost 内的队列、交换器和绑定提供命名空间及安全属性。

基于存储空间和性能的考虑,RabbitMQ 集群中的各节点存储的消息是不同的(有点儿类似分片集 群,各节点数据并不是全量对等的),各节点之间同步备份的仅仅是上述元数据以及 Queue Owner(队列所有者,就是实际创建 Queue 并保存消息数据的节点)的指针。当集群中某个节点崩溃 后,该节点的队列进程和关联的绑定都会消失,关联的消费者也会丢失订阅信息,节点恢复后(前提是 消息有持久化)消息可以重新被消费。虽然消息本身也会持久化,但如果节点磁盘存储设备发生故障那 同样会导致消息丢失。

总的来说,该集群模式只能保证集群中的某个 Node 挂掉后应用程序还可以切换到其他 Node 上继续 地发送和消费消息,但无法保证原有的消息不丢失,所以并不是一个真正意义的高可用集群。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UKg5PJmp-1631776584295)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210905192745-dbx0nr9.png)]

这是 RabbitMQ 内置的集群模式,Erlang 语言天生具备分布式特性,所以不需要借助类似 Zookeeper 之类的组件来实现集群(集群节点间使用 cookie 来进行通信验证,所有节点都必须使用相同 的 .erlang.cookie 文件内容),不同节点的 Erlang、RabbitMQ 版本必须一致。


镜像队列模式

前面我们讲了,RabbitMQ 内置的集群模式有丢失消息的风险,“镜像队列”可以看成是对内置默认 集群模式的一种高可用架构的补充。可以将队列镜像(同步)到集群中的其他 broker 上,相当于是多副 本冗余。如果集群中的一个节点失效,队列能自动地切换到集群中的另一个镜像节点上以保证服务的可 用性,而且消息不丢失。

在 RabbitMQ 镜像队列中所谓的 master 和 slave 都仅仅是针对某个 queue 而言的,而不是 node。一 个 queue 第一次创建所在的节点是它的 master 节点,其他节点为 slave 节点。如果 master 由于某种原因 失效,最先加入的 slave 会被提升为新的 master。

无论客户端请求到达 master 还是 slave,最终数据都是从 master 节点获取。当请求到达 master 节点 时,master 节点直接将消息返回给 client,同时 master 节点会通过 GM(Guaranteed Multicast)协议 将 queue 的最新状态广播到 slave 节点。GM 保证了广播消息的原子性,即要么都更新要么都不更新。当 请求到达 slave 节点时,slave 节点需要将请求先重定向到 master 节点,master 节点将消息返回给 client,同时 master 节点会通过 GM 协议将 queue 的最新状态广播到 slave 节点。

很多同学可能就会疑惑,这样设计太傻叉了,slave 完全是闲置的啊!干嘛不学习 MySQL 主从复 制,起码可以搞个读写分离啊!其实业界很多 HA 架构实践中冗余资源都是闲置的。前面我们讲了 RabbitMQ 镜像队列中的 master、slave 是 Queue 维度而并非 Node 维度,所以我们可以交叉减少资源限 制,如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JTz7UPFX-1631776584296)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210905193013-hbq96go.png)]


Federation 联邦模式

Federation 和 Shovel 类似,也是一个实现跨集群、节点消息同步的插件。支持联邦交换器、联邦队 列(作用在不同级别)。

Federation 插件允许你配置一个 exchanges federation 或者 queues federation。

一个 exchange/queues federation 允许你从一个或者多个 upstream 接收信息,就是远程的 exchange/queues。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A82GWmJ0-1631776584297)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210905193136-dvpcyhi.png)]

无论是 Federation 还是 Shovel 都只是解决消息数据传输的问题(当然插件自身可能会一些应用层的 优化),跨机房跨城市的这种网络延迟问题是客观存在的,不是简单的通过什么插件可以解决的,一般 需要借助昂贵的专线。

很多书籍和文章中存在误导大家的,可能会说 Federation/Shovel 可以解决延迟的问题,可以实现 异地多活等等,其实这都是错误的。而且我可以负责的告诉大家,他们所谓的“异地多活”并非大厂最 佳实践。

例如:使用 Shovel 构建集群,RabbitMQ 和应用程序都选择双机房部署时,当杭州机房发生了消息 积压后超出阈值部分的消息就会被转发到上海机房中,此时上海机房的应用程序直接消费掉上海机房 RabbitMQ 的消息,这样看起来上海机房是可以分摊负载,而且一定程度上实现“双机房多活”的。但是 数据库呢?选择两边都部署还是仅部署在某个机房呢?两边同时写入是很容易造成冲突的,如果数据库 仅仅部署在杭州机房,那么数据库也可能成为瓶颈导致消费速度依然上不去,只不过是多了上海机房中 的消费者实例节点而已。

而使用 Federation 模式呢?如果要真正要实现“双机房多活”那么应用程序也是多机房的,那某些 Exchange/Queue 中的消息会在两边机房都有,两边机房的应用程序都会同时消息,那必然会造成重复 消息!


异地多活架构

方 案 容量 容灾 成本
异 地 多 活 [优]基于逻辑机房,容量 可伸缩的云微架构

[优]容量可异地伸缩
[优]日常运行,容灾时可用 性高。

[劣]受城际网络故障影响, 影响度取决于横向依赖程 度
[优]IDC、应用等成本在日常得到有效利用
两 地 三 中 心 [劣]仅可部署在一个城市,容量伸缩有城市级 瓶颈 [劣]灾备设施冷备等待,容 灾时可用性低。 [劣]容灾设施等成本仅在容灾时才使用,且受限于可用 性



3.2 单机多实例部署

在单机版基础上 ,也就是一台 Linux 虚拟机上启动多个 RabbitMQ 实例,部署集群。

在单个 Linux 虚拟机上运行多个 RabbitMQ 实例:

多个 RabbitMQ 使用的端口号不能冲突

多个 RabbitMQ 使用的磁盘存储路径不能冲突

多个 RabbitMQ 的配置文件也不能冲突

多个 RabbitMQ 虚拟主机的名称不能重复


RABBITMQ_NODE_PORT

用于设置 RabbitMQ 的服务发现,对外发布的其他端口在这个端口基础上计算得来。

端口号 说明
4369 epmd,RabbitMQ 节点和 CLI 工具使用的对等发现服务
5672、 5671 分别为不带 TLS 和带 TLS 的 AMQP 0-9-1 和 1.0 客户端使用
25672 用于节点间和 CLI 工具通信(Erlang 分发服务器端口),并从动态范围分配(默认情 况下限制为单个端口,计算为 AMQP 端口 + 20000)。一般这些端口不应暴露出去。
35672-35682 由 CLI 工具(Erlang 分发客户端端口)用于与节点进行通信,并从动态范围(计算为 服务器分发端口 + 10000 通过服务器分发端口 + 10010)分配。
15672 HTTP API 客户端,管理 UI 和 Rabbitmqadmin(仅在启用了管理插件的情况下)
61613、 61614 不带 TLS 和带 TLS 的 STOMP 客户端(仅在启用 STOMP 插件的情况下)
1883、 8883 如果启用了 MQTT 插件,则不带 TLS 和具有 TLS 的 MQTT 客户端
15674 STOMP-over-WebSockets 客户端(仅在启用了 Web STOMP 插件的情况下)
15675 MQTT-over-WebSockets 客户端(仅在启用 Web MQTT 插件的情况下)
15692 Prometheus 指标(仅在启用 Prometheus 插件的情况下)

RABBITMQ_NODENAME

用于设置 RabbitMQ 节点名称 @ 前缀是用户名,@ 后缀是 RabbitMQ 所在的 Linux 主机的 hostname

数据存储目录

在这里插入图片描述

日志数据存储目录

在这里插入图片描述


RabbitMQ 使用的环境变量

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2n9pO475-1631776584298)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210905194203-trk7i6m.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CGbuj6Jp-1631776584299)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210905194230-nz552au.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tyJZMuKN-1631776584299)(https://b3logfile.com/siyuan/1619927307428/assets/image-20210905194249-jhx6plj.png)]


开始单机多部署

下载 rabbitMQ 的源码


https://github.com/rabbitmq/rabbitmq-server/releases

在下面路径下找到 rabbitmq.conf.example

/Users/dulane/Downloads/SafariDownload/rabbitmq-server-3.8.22/deps/rabbit/docs

因为是通过 rpm 安装的 rabbitmq 没有 rabbitmq.conf 文件

参考 rabbitmq.conf.example 这个文件新建 rabbitmq.conf 文件


具体操作

➜  ~ cd /opt
➜  /opt mkdir rabbitconf
➜  /opt ls
rabbitconf
➜  /opt ll
总用量 0
drwxr-xr-x. 2 root root 6 96 18:02 rabbitconf
➜  /opt id rabbitmq
uid=997(rabbitmq) gid=995(rabbitmq)=995(rabbitmq)
#更改权限组
➜  /opt chown :rabbitmq -R rabbitconf/
➜  /opt ll  
总用量 0
drwxr-xr-x. 2 root rabbitmq 6 96 18:02 rabbitconf
➜  /opt cd rabbitconf 
#创建配置文件rabbit.conf
➜  rabbitconf touch rabbit1.conf rabbit2.conf rabbit3.conf
➜  rabbitconf ll
总用量 0
-rw-r--r--. 1 root root 0 96 18:04 rabbit1.conf
-rw-r--r--. 1 root root 0 96 18:04 rabbit2.conf
-rw-r--r--. 1 root root 0 96 18:04 rabbit3.conf
#更改权限组
➜  rabbitconf chown :rabbitmq rabbit*.conf
➜  rabbitconf ll
总用量 0
-rw-r--r--. 1 root rabbitmq 0 96 18:04 rabbit1.conf
-rw-r--r--. 1 root rabbitmq 0 96 18:04 rabbit2.conf
-rw-r--r--. 1 root rabbitmq 0 96 18:04 rabbit3.conf
➜  rabbitconf  
#分别修改
vim rabbit1.conf
vim rabbit2.conf
vim rabbit3.conf
#添加如下内容 一个是management ui插件的访问端口默认15672 第二个guest 访问默认不允许远程访问
management.tcp.port = 6001
loopback_users.guest = false
management.tcp.port = 6002
loopback_users.guest = false
management.tcp.port = 6003
loopback_users.guest = false
#分别启动三个rabbitmq
#也可以这种方式 export RABBITMQ_NODE_PORT=5001 export RABBITMQ_NODENAME=rabbit1  rabbitmq-server
RABBITMQ_NODENAME=rabbit1 RABBITMQ_NODE_PORT=5001 RABBITMQ_CONFIG_FILE=/opt/rabbitconf/rabbit1.conf rabbitmq-server
RABBITMQ_NODENAME=rabbit2 RABBITMQ_NODE_PORT=5002 RABBITMQ_CONFIG_FILE=/opt/rabbitconf/rabbit2.conf rabbitmq-server
RABBITMQ_NODENAME=rabbit3 RABBITMQ_NODE_PORT=5003 RABBITMQ_CONFIG_FILE=/opt/rabbitconf/rabbit3.conf rabbitmq-server
#分别访问 用户名密码guest
http://172.16.94.13:6001/
http://172.16.94.13:6002/
http://172.16.94.13:6003/
#查看生成的文件
➜  ~ cd /var/lib/rabbitmq
➜  rabbitmq ls
erl_crash.dump  mnesia
➜  rabbitmq cd mnesia
➜  mnesia ls
rabbit1@rabbitmq                 rabbit2@rabbitmq.pid             rabbit@mha
rabbit1@rabbitmq-feature_flags   rabbit2@rabbitmq-plugins-expand  rabbit@mha-feature_flags
rabbit1@rabbitmq.pid             rabbit3@rabbitmq                 rabbit@mha-plugins-expand
rabbit1@rabbitmq-plugins-expand  rabbit3@rabbitmq-feature_flags   rabbit@rabbitmq
rabbit2@rabbitmq                 rabbit3@rabbitmq.pid             rabbit@rabbitmq-feature_flags
rabbit2@rabbitmq-feature_flags   rabbit3@rabbitmq-plugins-expand  rabbit@rabbitmq-plugins-expand
➜  mnesia cd /var/log/rabbitmq/
➜  rabbitmq ls
log                           rabbit3@rabbitmq_upgrade.log        rabbit@rabbitmq.log-20210826.gz
rabbit1@rabbitmq.log          rabbit@mha.log                      rabbit@rabbitmq.log-20210902.gz
rabbit1@rabbitmq_upgrade.log  rabbit@mha.log-20210826.gz          rabbit@rabbitmq.log-20210905.gz
rabbit2@rabbitmq.log          rabbit@mha_upgrade.log              rabbit@rabbitmq_upgrade.log
rabbit2@rabbitmq_upgrade.log  rabbit@mha_upgrade.log-20210826.gz  rabbit@rabbitmq_upgrade.log-20210826.gz
rabbit3@rabbitmq.log          rabbit@rabbitmq.log                 rabbit@rabbitmq_upgrade.log-20210902.gz
#优雅的关闭
➜  rabbitmq rabbitmqctl -n rabbit1 stop
Stopping and halting node rabbit1@rabbitmq ...
➜  rabbitmq rabbitmqctl -n rabbit2 stop
Stopping and halting node rabbit2@rabbitmq ...
➜  rabbitmq rabbitmqctl -n rabbit3 stop
Stopping and halting node rabbit3@rabbitmq ...
➜  rabbitmq 


访问客户端

分别访问 用户名密码 guest

http://172.16.94.13:6001/

http://172.16.94.13:6002/

http://172.16.94.13:6003/

在客户端分别创建 Queue 发现并不互通而是相互独立的

在这里插入图片描述

在这里插入图片描述


启动效果

在这里插入图片描述

在这里插入图片描述



3.3 集群管理


安装环境

虚拟机上面 三台 centos 7 Linux 服务器

172.16.94.22 node1

172.16.94.23 node3

172.16.94.24 node2


RabbitMQ 集群部署

#环境 centos 7
#1、安装依赖:
yum install socat -y
#2、安装Erlang
#下载 
#https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm
#安装
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
#rpm -ivh erlang-23.0.2-1.el8.x86_64.rpm
#3、安装RabbitMQ
#下载地址:
#https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
#安装
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
#rpm -ivh rabbitmq-server-3.8.5-1.el8.noarch.rpm
#默认安装位置
/usr/lib/rabbitmq
#跳转到可执行文件位置
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/sbin
#开启UI插件
rabbitmq-plugins enable rabbitmq_management
#首先设置主机名,这将修改/etc/hostname的内容
hostnamectl set-hostname node1
#先启动一台 rabbitmq
systemctl start rabbitmq-server
#查看启动后生成的.erlang.cookie
/var/lib/rabbitmq
➜  rabbitmq ls -a
.  ..  .erlang.cookie  mnesia
#将生成的.erlang.cookie copy到其他两台服务器
scp .erlang.cookie 172.16.94.23:`pwd`
scp .erlang.cookie 172.16.94.24:`pwd`
#分别修改所有服务器.erlang.cookie的权限组
➜  rabbitmq ll -a
总用量 8.0K
drwxr-xr-x.  3 rabbitmq rabbitmq   42 96 19:02 .
drwxr-xr-x. 40 root     root     4.0K 95 15:14 ..
-r--------.  1 root     root       20 96 19:02 .erlang.cookie
drwxr-x---.  2 rabbitmq rabbitmq    6 615 2020 mnesia
➜  rabbitmq  chown :rabbitmq .erlang.cookie 
➜  rabbitmq ll -a
总用量 8.0K
drwxr-xr-x.  3 rabbitmq rabbitmq   42 96 19:02 .
drwxr-xr-x. 40 root     root     4.0K 95 15:14 ..
-r--------.  1 root     rabbitmq   20 96 19:02 .erlang.cookie
drwxr-x---.  2 rabbitmq rabbitmq    6 615 2020 mnesia
#分别启动三台服务器的RabbitMQ
rabbitmq-server -detached 
#rabbitmqctl stop
#查看集群信息
rabbitmqctl cluster_status
#添加节点到集群 分别在node1和node3执行即可
# join_cluster默认是使用disk模式,后面可以加入参数--ram启用内存模式
# 停止Erlang VM上运行的RabbitMQ应用,保持Erlang VM的运行
rabbitmqctl stop_app
# 移除当前RabbitMQ虚拟主机中的所有数据:重置
rabbitmqctl reset  
# 将当前RabbitMQ的主机加入到rabbit@node2这个虚拟主机的集群中。一个节点也是集群。
rabbitmqctl join_cluster rabbit@node2
# 启动当前Erlang VM上的RabbitMQ应用
rabbitmqctl start_app
#rabbit@node2 节点名称 rabbit默认前缀 @后面hostname
#启动集群成功
➜  rabbitmq rabbitmqctl cluster_status
Cluster status of node rabbit@node3 ...
Running Nodes
rabbit@node1
rabbit@node2
rabbit@node3


集群修改

# 将虚拟主机(RabbitMQ的节点)rabbit@node3从集群中移除,但是rabbit@node3还保留集群信
# 还是会尝试加入集群,但是会被拒绝。可以重置rabbit@node3节点。
rabbitmqctl forget_cluster_node rabbit@node3
#修改集群名称(任意节点执行都可以)
rabbitmqctl set_cluster_name rabbit_cluster
#查看集群状态(任意节点执行都可以) 
rabbitmqctl cluster_status


添加用户

#在三个RabbitMQ节点上的任意一个添加用户,设置用户权限,设置用户标签,即可
rabbitmqctl add_user root 1234
rabbitmqctl set_permissions --vhost "/" root ".*" ".*" ".*"
rabbitmqctl set_user_tags --vhost "/" root administrator


测试集群

集群状态

在这里插入图片描述

集群 页面访问

在这里插入图片描述


注意事项

  1. 集群服务器节点之间的.erlang.cookie 一致 权限组一致为 rabbitmq
  2. /etc/hostname 和 /etc/hosts 添加对应的节点名称 且刷新下 hosts
  3. 注意检查下 hostname 要可以相互 ping 通
  4. 注意开启 management ui 不然桌面端不显示
  5. 这个集群部署坑真的多,百度了很多次


出现问题


添加节点出现问题


在这里插入图片描述


TCP connection succeeded but Erlang distribution failed

#解决方式:
sudo cp /var/lib/rabbitmq/.erlang.cookie ~/.erlang.cookie
#重启即可
#原因:
#RabbitMQ的erlang.cookie和用户的cookie冲突了,需要用rabbitmq的cookie去覆盖用户的cookie。
#参考文章
https://blog.csdn.net/u013492463/article/details/81032505


Node name (or hostname) mismatch

#分别设置主机名和对应hosts,这将修改/etc/hostname的内容
hostnamectl set-hostname node1
#参考文章
https://blog.csdn.net/mrbone11/article/details/112358903


Error when reading /var/lib/rabbitmq/.erlang.cookie: eacces

 cd /var/lib/rabbitmq
#Centos7关于.erlang.cookie权限问题的解决方式 
#以rpm安装rabbitmq的方式 
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
#以解压缩安装rabbitmq的方式 
chown rabbitmq:rabbitmq ~/.erlang.cookie
#参考文章:
https://blog.csdn.net/Guider2334/article/details/80001728


Error: mnesia_not_running

#查了一下 是由于node1的服务器MQ 没有开启
#node1服务器执行如下指令:
rabbitmqctl start_app
#然后再操作一遍 在node3服务器
rabbitmqctl join_cluster rabbit@node2
#参考文章:
https://www.jianshu.com/p/69b3384b1752



3.4 RabbitMQ 镜像集群配置



3.4.1 镜像队列的介绍

RabbitMQ 中队列的内容是保存在单个节点本地的(声明队列的节点)。跟交换器和绑定不同,它 们是对于集群中所有节点的。如此,则队列内容存在单点故障,解决方式之一就是使用镜像队列。在多 个节点上拷贝队列的副本。

每个镜像队列包含一个 master,若干个镜像。 master 存在于称为 master 的节点上。 所有的操作都是首先对 master 执行,之后广播到镜像。 这涉及排队发布,向消费者传递消息,跟踪来自消费者的确认等。 镜像意味着集群,不应该 WAN 使用。

发布到队列的消息会拷贝到该队列所有的镜像。消费者连接到 master,当消费者对消息确认之后, 镜像删除 master 确认的消息。

队列的镜像提供了高可用,但是没有负载均衡。 HTTP API 和 CLI 工具中队列对象的字段原来使用的是 slave 代表 secondaries,现在盖字段的存在仅 是为了向后兼容,后续版本会移除。

可以使用策略随时更改队列的类型,可以首先创建一个非镜像队列,然后使用策略将其配置为镜像 队列或者反过来。非镜像队列没有额外的基础设施,因此可以提供更高的吞吐率。



3.4.2 镜像队列 Master 选举

master 选举策略:

  1. **最长的运行镜像升级为主镜像,前提是假定它与主镜像完全同步。**如果没有与主服务器同步的 镜像,则仅存在于主服务器上的消息将丢失。
  2. 镜像认为所有以前的消费者都已突然断开连接。它重新排队已传递给客户端但正在等待确认的 所有消息。这包括客户端已为其发出确认的消息,例如,确认是在到达节点托管队列主节点之 前在线路上丢失了,还是在从主节点广播到镜像时丢失了。在这两种情况下,新的主服务器都 别无选择,只能重新排队它尚未收到确认的所有消息。
  3. 队列故障转移时请求通知的消费者将收到取消通知。当镜像队列发生了 master 的故障转移, 系统就不知道向哪些消费者发送了哪些消息。已经发送的等待确认的消息会重新排队
  4. 重新排队的结果是,从队列重新使用的客户端必须意识到,他们很可能随后会收到已经收到的 消息。
  5. 当所选镜像成为主镜像时,在此期间发布到镜像队列的消息将不会丢失(除非在提升的节点上 发生后续故障)。发布到承载队列镜像的节点的消息将路由到队列主服务器,然后复制到所有 镜像。如果主服务器发生故障,则消息将继续发送到镜像,并在完成向主服务器的镜像升级后 将其添加到队列中。
  6. 即使主服务器(或任何镜像)在正在发布的消息与发布者收到的确认之间失败,由客户端使用 发布者确认发布的消息仍将得到确认。从发布者的角度来看,发布到镜像队列与发布到非镜像 队列没有什么不同。


3.4.3 镜像队列的类型
ha- mode ha- params 结果
exactly count 设置集群中队列副本的个数(镜像 +master)。1 表示一个副本;也就 是 master。如果 master 不可用,行为依赖于队列的持久化机制。2 表示 1 个 master 和 1 个镜像。如果 master 不可用,则根据镜像推举策略从镜 像中选出一个做 master。如果节点数量比镜像副本个数少,则镜像覆盖 到所有节点。如果 count 个数少于集群节点个数,则在一个镜像宕机 后,会在其他节点创建出来一个镜像。将“exactly”模式与“ha-promote- on-shutdown”: “ always”一起使用可能很危险,因为队列可以在整个集 群中迁移并在关闭时变得不同步。
all (none) 镜像覆盖到集群中的所有节点。当添加一个新的节点,队列就会复制过 去。这个配置很保守。一般推荐 N/2+1 个节点。在集群所有节点拷贝镜 像会给集群所有节点施加额外的负载,包括网络 IO,磁盘 IO 和磁盘空间 使用。
nodes node names 在指定 node name 的节点上复制镜像。node name 就是在 rabbitmqctl cluster_status 命令输出中的 node name。如果有不属于集群的节点名 称,它不报错。如果指定的节点都不在线,则仅在客户端连接到的声明 镜像的节点上创建镜像。


3.4.4 镜像队列具体设置

镜像队列就是对于把 Master 的消息数据同步到 Slave 中一份,保证高可用,操作依然是由 Master 处理

镜像队列类型 ha-mode 选择节点类型,ha-params 选择节点个数

exactly 就是确认几个节点 比如集群 5 台机器,count=3 代表 1 更 M 2 个 S 剩下两机器备用就是 MS 挂了之后补上

all 不需要配置节点个数 比如集群 5 台机器,则一台 Master 其他 4 台 Slave

nodes 则是根据配置的 node name 来作为副本镜像 slave


官方文档


https://www.rabbitmq.com/ha.html


配置设置

在{

{RabbitMQ 集群部署}} 的基础上进行配置镜像队列

 # rabbitmqctl set_policy 命令
 # myhalf 设置 策略名称
 # "^queue.*3$"设置正则匹配队列名称
 # {"ha-mode":exactly","ha-params":2} exactly模式 count为2代表1主1从
 rabbitmqctl set_policy myhalf "^queue.*3$" '{"ha-mode":"exactly","ha-params":2}' 
#取消该镜像队列的设置
 rabbitmqctl set_policy myhalf "^queue.*3$" '{"ha-mode":"exactly","ha-params":1}'


其他配置

# 对/节点配置镜像队列,使用全局复制 
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
# 指定优先级,数字越大,优先级越高
rabbitmqctl set_policy --priority 1 ha-all "^" '{"ha-mode":"all"}'


测试

创建一个队列名称 queue.mirror3

在这里插入图片描述

设置策略查看具体信息 node3 为 master node1 为 slave

在这里插入图片描述

发送消息

image.png

image.png


关闭 mirror Node1

rabbitmqctl stop_app

mirro 变成了 node2,未同步,点击 synchronized 进行同步

image.png

启动 node1 ,mirror 不会再改变成 node1 了

取消该镜像队列的设置

 rabbitmqctl set_policy myhalf "^queue.*3$" '{"ha-mode":"exactly","ha-params":1}'



3.5 负载均衡-HAProxy

将客户端的连接和操作的压力分散到集群中的不同节点,防止单个或几台服务器压力过大成为访问 的瓶颈,甚至宕机。 HAProxy 是一款开源免费,并提供高可用性、负载均衡以及基于 TCP 和 HTTP 协议的代理软件,可以 支持四层、七层负载均衡,经过测试单节点可以支持 10W 左右并发连接。

LVS 是工作在内核模式(IPVS),支持四层负载均衡,实测可以支撑百万并发连接。

Nginx 支持七层的负载均衡(后期的版本也支持四层了),是一款高性能的反向代理软件和 Web 服 务器,可以支持单机 3W 以上的并发连接。

这里我们使用 HAProxy 来做 RabbitMQ 的负载均衡,通过暴露 VIP 给上游的应用程序直接连接,上游 应用程序不感知底层的 RabbitMQ 的实例节点信息。


安装 HAProxy


安装方式 一

yum install gcc -y 
tar -zxf haproxy-2.1.0.tar.gz 
cd haproxy-2.1.0 
make TARGET=linux-glibc 
make install 
mkdir /etc/haproxy

#赋权 
groupadd -r -g 149 haproxy 
# 添加用户 
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy

#创建haproxy配置文件 
touch /etc/haproxy/haproxy.cfg


安装方式 二

yum -y install haproxy
#那么haproxy默认在/usr/sbin/haproxy
#且会自动创建配置文 件/etc/haproxy/haproxy.cfg


配置 HAProxy

haproxy.cfg 配置文件替换修改

#---------------------------------------------------------------------
# Example configuration for a possible web application.  See the
# full configuration options online.
#
#   http://haproxy.1wt.eu/download/1.4/doc/configuration.txt
#
#---------------------------------------------------------------------

#---------------------------------------------------------------------
# Global settings
#---------------------------------------------------------------------
global
    # to have these messages end up in /var/log/haproxy.log you will
    # need to:
    #
    # 1) configure syslog to accept network log events.  This is done
    #    by adding the '-r' option to the SYSLOGD_OPTIONS in
    #    /etc/sysconfig/syslog
    #
    # 2) configure local2 events to go to the /var/log/haproxy.log
    #   file. A line like the following can be added to
    #   /etc/sysconfig/syslog
    #
    #    local2.*                       /var/log/haproxy.log
    #
    log         127.0.0.1 local2

    chroot      /var/lib/haproxy
    pidfile     /var/run/haproxy.pid
    maxconn     4000
    user        haproxy
    group       haproxy
    daemon

    # turn on stats unix socket
    stats socket /var/lib/haproxy/stats

#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
    log global
# tcp:实例运行于纯 TCP 模式,第 4 层代理模式,在客户端和服务器端之间将建立一个全双工的连接,
# 且不会
# 通常用于 SSL、SSH、SMTP 等应用;  
    mode tcp
    option tcplog
    option dontlognull
    retries 3
    option redispatch
    maxconn 2000
# contimeout 5s
    timeout connect 5s
# 客户端空闲超时时间为 60 秒则 HA 发起重连机制
    timeout client 60000
# 服务器端链接超时时间为 15 秒则 HA 发起重连机制
    timeout server 15000

listen rabbit_cluster
# VIP,反向代理到下面定义的三台 Real Server
    bind 172.16.94.13:5672
#配置 TCP 模式
    mode tcp
#简单的轮询
    balance roundrobin
# rabbitmq 集群节点配置
# inter 每隔五秒对 mq 集群做健康检查,2 次正确证明服务器可用,2 次失败证明服务器不可用,并且配置主备机制
server rabbitmqNode1 172.16.94.22:5672 check inter 5000 rise 2 fall 2
server rabbitmqNode2 172.16.94.24:5672 check inter 5000 rise 2 fall 2
server rabbitmqNode3 172.16.94.23:5672 check inter 5000 rise 2 fall 2
#配置 haproxy web 监控,查看统计信息
listen stats
    bind 172.16.94.13:9000
    mode http
    option httplog
# 启用基于程序编译时默认设置的统计报告
    stats enable
#设置 haproxy 监控地址为 http://node1:9000/rabbitmq-stats
    stats uri /rabbitmq-stats
# 每5s刷新一次页面
    stats refresh 5s

启动 HAProxy:

haproxy -f /etc/haproxy/haproxy.cfg

关闭

➜  ~ haproxy -f /etc/haproxy/haproxy.cfg
➜  ~ ps aux | grep haproxy
haproxy    6387  0.1  0.1  48604  1688 ?        Ss   12:30   0:00 haproxy -f /etc/haproxy/haproxy.cfg
➜  ~ kill -9 6387
➜  ~ 

检查进程状态

可以通过访问

http://172.16.94.13:9000/rabbitmq-stats

​ 查看状态

image.png


测试

可以在代码中直接测试,代码配置直接连接到 HAProxy 和监听端口上。

新建 Springboot 项目选择 rabbitmq 和 mvc


spring.application.name=haproxy_demo
spring.rabbitmq.host=mha
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=1234
spring.rabbitmq.port=5672

config 类

package com.galaxy.mqdemospringbootproxy.config;

/**
 * @author lane
 * @date 2021年09月16日 下午1:16
 */
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return new Queue("queue.haproxy",
                true,
                false,
                false,
                null);
    }

    @Bean
    public Exchange exchange() {
        return new DirectExchange("ex.haproxy",
                true,
                false,
                null);
    }

    @Bean
    public Binding binding() {
        return new Binding("queue.haproxy",
                Binding.DestinationType.QUEUE,
                "ex.haproxy",
                "key.haproxy", null);
    }

    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

}

controller 类

package com.galaxy.mqdemospringbootproxy.controller;

/**
 * @author lane
 * @date 2021年09月16日 下午1:17
 */
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.UnsupportedEncodingException;

@RestController
public class BizController {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/biz/{hello}")
    public String doBiz(@PathVariable String hello) throws UnsupportedEncodingException {

        final MessageProperties properties = MessagePropertiesBuilder.newInstance()
                .setContentType("text/plain")
                .setContentEncoding("utf-8")
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("mykey", "myvalue")
                .build();

        final Message message = MessageBuilder
                .withBody(hello.getBytes("utf-8"))
                .andProperties(properties)
                .build();

        amqpTemplate.send("ex.haproxy", "key.haproxy", message);

        return "ok";
    }

}

访问 localhost:8080/biz/hello 之后查看队列信息

image.png

访问 HAProxy

http://172.16.94.13:9000/rabbitmq-stats

image.png



3.6 监控

RabbitMQ 自带的(Management 插件)管理控制台功能比较丰富,不仅提供了 Web UI 界面,还暴 露了很多 HTTP API 的能力。其中也具备基本的监控能力。此外,自带的命令行工具(例如: rabbitmqctl )也比较强大。

不过这些工具都不具备告警的能力。在实际的生产环境中,我们需要知道负载情况和运行监控状态 (例如:系统资源、消息积压情况、节点健康状态等),而且当发生问题后需要触发告警。像传统的监 控平台 Nagios、Zabbix 等均提供了 RabbitMQ 相关的插件支持。

另外,当前云原生时代最热门的 Prometheus 监控平台也提供了 rabbitmq_exporter,结合 Grafana 漂亮美观的 dashboard(可以自定义,也可以在仓库选择一些现有的),目前拉勾公司就是使用 Prometheus + Grafana 来监控 RabbitMQ 的,并实现了水位告警通知。

参考文档

https://www.rabbitmq.com/prometheus.html



结语

RabbitMQ 真的有毒,当然主要原因在于我,这一个月跌跌撞撞的学完了 RabbitMQ 正常应该是一周内,真心堕落了一个月!!!

内心仍然不够强大,意志力也没想象中坚韧,叹息一声!天作孽犹可活,自作孽不可活。

我看过 B 站 YJG 的学习观,也看过 crash couse 系列的学习,斯坦福大学教授的自控力,了解了一些生理因素,但是应该是还不够,生活的确没有想象中那么简单,还是会犯错!

如果在哪方面意志薄弱,那就不要接触在这方面的领域内的任何东西,以免陷入不想要却又走不出的境地,最好坚持自我习惯规律,不好的事情碰。你在凝望深渊的时候,深渊也在注视 👀 着你!在外围的时候诱惑是 1 意志力是 5,越靠近深渊诱惑会增加,意志力则相对减弱,最后沦陷!

明明只是去超市买袋洗衣服,却买了零食和饮料。实际上在准备去超市的时候自己已经有了预感会买垃圾食品。因为毕竟你还是对于自己有些了解,哪怕你有时有地控制不住你自己。明明说好晚餐不吃肉,但是如果餐厅里面有肉食,依然会点肉食!呵 人类!难逃真香定律。

实际上人类本质不就是一系列复杂的生物化学反应。比如爱情,比如欲望,比如快乐,比如痛苦,其本质就是一系列神经递质作用后的结果。比如熟悉的多巴胺不就是告诉你:你这样做会快乐。就比如多巴胺刚被发现的时候,实验中的小白鼠 🐁,一直刺激多巴胺的产生,在两端来回跑动直到死亡。智人和动物最大的不同是有自己的理智与智慧。当没有理智的时候就变成动物了比如下半身的动物。就比如看到公众号帅地的一篇文章他以前的同学裸聊,被骗走 50W,其中大部分是贷款。其中有句话他说 当时他什么也不知道了,别人让怎样他就怎样,完全没有了理智!

人生啊 最重要的是自己想要的什么,有什么追求!而不是被一些乱七八糟的递质控制住了自己。现在为什么有些 App 这么火呢,就是根据生物因素七情六欲来绑架你。app 火不火就是诱惑力大不大。适度游戏有益,过度游戏伤身!实际是对你自制力考验,如果不诱惑你,你怎么会玩,当你被诱惑之后,诱惑力加强。玩一把变成了玩亿把!

人生应该找到自己的追求,摒弃诱惑杂念,明心见性!知道自己想要什么,喜欢什么。而不是刷一天短视频,刷到最后空虚寂寞冷!人生有很多岔路口,找准自己的的道,别迷失自我!

找准自己的方向,坚持努力,规律学习生活,别看到岔路口就想走两步,最后不知道走到哪去了,在尽头到时候抱怨碌碌无为,蹉跎了岁月,要是要是,哪那么多要是,别看什么乱七八糟的重生文章。抱歉!人生没有重来

看完这篇文章之后,晚上依然打开 xol、x 音、x 药。真香!哪天突然觉醒卸载了之后,想一下这篇文章如何避免下次诱惑。远离任何可能的诱惑、坚持规律健康的生活!



版权声明:本文为LaneDu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。