初识RabbitMQ

  • Post author:
  • Post category:其他


RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成.

RabbitMQ工作模型图

  1. Publisher

    消息的生产者
  2. Consumer

    消息的消费者,两种消费消息的模式,Pull模式(主动获取basicGet)/Push模式(被动接收basicConsume)

    RabbitMQ 中 pull 和 push 都有实现。而 kafka 和RocketMQ只有pull。
  3. Exchange

    交换器,用来接收生产者发出的消息并将这些消息路由给服务器中的队列。
  4. Binding

    绑定,用于消息队列和交换器之间的关联。
  5. Queue

    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。

    实际上 RabbitMQ 是用数据库来存储消息的,这个数据库跟RabbitMQ一样是用Erlang 开发的,名字叫 Mnesia。
  6. Routing-key

    路由键,RabbitMQ决定消息投递到哪个队列的规则。

    队列通过路由健绑定到交换器。

    消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和 绑定使用的路由键进行匹配。匹配则投递到该队列,不匹配将进入私信队列。
  7. Connection

    链接,指RabbitMQ服务器和服务建立的TCP连接。
  8. Channel

    通道,如果所有的生产者发送消息和消费者接收消息,都直接创建和释放TCP长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,也会浪费时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接。我们把它翻译成通道,或者消息信道。这样我们就可以在保持的TCP 长连接里面去创建和释放Channel,大大了减少了资源消耗。

    不同的 Channel 是相互隔离的,每个 Channel 都有自己的编号。
  9. Broker

    中介,表示消息队列服务器实体。 RabbitMQ的服务器我们把它叫做 Broker,中文翻译是代理/中介,因为MQ服务器帮助我们做的事情就是存储、转发消息。
  10. Vhost

    虚拟主机,每个虚拟主机对应一套交换机、队列、和他们之间的绑定关系。不同的VHOST中可以有同名的 Exchange 和 Queue,它们是完全透明的。Vhost 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。可以给不同的系统之间使用单独的Vhost,创建专属的用户,给用户分配对应的Vhost权限。RabbitMQ默认Vhost名字是“/”

在这里插入图片描述



消息分发机制 Exchanges and Exchange Types

我们说到 RabbitMQ 引入 Exchange 是为了实现消息的灵活路由,到底有哪些路由方式?

RabbitMQ 中一共有四种类型的交换机。

Exchange type Default pre-declared names
Direct exchange (直连) (Empty string) and amq.direct
Fanout exchange(广播) amq.fanout
Topic exchange(主题) amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

Direct Exchange 直链模式 也是默认模式 (发布订阅模式 完全匹配)

在这里插入图片描述

Fanout exchange (广播模式) Fanout交换将消息路由到绑定到其的所有队列,并且忽略了路由键。体育赛事广播、游戏排行广播、分布式系统可以广播各种状态和配置更新、群聊

在这里插入图片描述
Topic exchange (主题,规则匹配)

一个队列与主题类型的交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:

# 代表匹配 0 个或者多个单词

* 代表匹配不多不少一个单词

Headers Exchange

不常用



持久化机制

RabbitMQ 的持久化分为消息持久化、队列持久化、交换器持久化。无论是持久化消息还是非持久化消息都可以被写入磁盘。

非持久化消息在RabbitMQ宕机时会丢失,即使已写入磁盘也会删除。

队列、交换机持久化参数

boolean durable


消息持久化 参数设置

// MessageProperties.PERSISTENT_TEXT_PLAIN
 public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
}

即 deliveryMode = 2

  1. 内存控制

    RabbitMQ 中通过内存阈值参数控制内存的使用量,当内存使用超过配置的阈值 时,RabbitMQ 会阻塞客户端的连接并停止接收从客户端发来的消息,以免服务崩溃, 同时,会发出内存告警,此时客户端于与服务端的心跳检测也会失效。

    内存控制可以通过命令来设置修改参数,RabbitMQ 提供 relative 与 absolute 两种配置方式
// 百分比 fraction 建议 0.4~0.66
rabbitmqctl set_vm_memory_high_watermark <fraction>
// 绝对值,固定大小,单位为 KB、MB、GB
rabbitmqctl set_vm_memory_high_watermark absolute <value>

fraction 为内存阈值,默认是 0.4,表示 RabbitMQ 使用的内存超过系统内存的 40%时,会产生内存告警,重启会失效,永久生效需要修改配置文件。

  1. 内存换页

    在RabbitMQ内存达到内存阀值并阻塞生产者时,会尝试将内存中的消息换页到磁盘重,以释放内存空间。内存换页由换页参数控制,默认为 0.5,表示内存使用量达到内存阀值到50%是会生效进行换页。即0.4*0.5=0.2
vm_memory_high_watermark_paging_ratio=0.5

当换页阈值大于 1 时,相当于禁用了换页功能

  1. 磁盘控制

    RabbitMQ 通过磁盘阈值参数控制磁盘的使用量,当磁盘剩余空间小于磁盘阈值 时,RabbitMQ 同样会阻塞生产者,避免磁盘空间耗尽。默认50M,由于是定时检测磁盘空间,不能完全消除因磁盘耗尽而导致崩溃的可能性。一种相对谨慎的做法是将磁盘阈值大小设置与内存相等。
rabbitmqctl set_disk_free_limit <limit> # limit 为绝对值,KB、MB、GB
rabbitmqctl set_disk_free_limit mem_relative <fraction> # fraction 为相对值,建议 1.0~2.0 之间
# rabbitmq.conf
disk_free_limit.relative=1.5
# disk_free_limit.absolute=50MB
  1. 插件管理
rabbitmq-plugins list # 查看
rabbitmq-plugins enable rabbitmq_management # 开启
rabbitmq-plugins disable rabbitmq_management #关闭
  1. 配置

    查看 rabbitmq 的有效配置
rabbitmqctl environment
  1. DEMO 这里提供JAVA客户端代码

生产者 MyProducer.java

消费者 MyConsumer.java

这里的3步在生产者和消费者执行都行,也可以在RabbitMQ管理后台手动新增和绑定。

一般我们本地测试先创建消费者,再等待生产者发送消息,所以这里在消费者执行生命。

            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
			//声明队列
            channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            //绑定队列和交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
import com.rabbitmq.client.*;
public class MyProducer {
    private final static String QUEUE_NAME = "SIMPLE_QUEUE";
    private final static String  EXCHANGE_NAME = "LIAO_EXCHANGE";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.142.129.217");
        factory.setPort(5673);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
        try {
            //创建连接
            Connection conn = factory.newConnection();
            //创建通道
            Channel channel = conn.createChannel();
            //声明交换机
           // channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
			//声明队列
            //channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            //绑定队列和交换机
           //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
            // 发送消息
            String msg = "Hello world, Rabbit MQ";
            channel.basicPublish( EXCHANGE_NAME,"liao.routingKey", null, msg.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者

package com.liao.edu.mq.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
    private final static String QUEUE_NAME = "LIAO_QUEUE";
    private final static String  EXCHANGE_NAME = "LIAO_EXCHANGE";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.142.129.217");
        factory.setPort(5673);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
            try {
                // 建立连接
                Connection conn = factory.newConnection();
                // 创建消息通道
                Channel channel = conn.createChannel();
                //生命交换机
             channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,null);
                //生命队列
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //绑定交换机和队列
                channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"liao.routingKey");

                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("consumerTag : "+consumerTag);
                        System.out.println("deliveryTag : "+envelope.getDeliveryTag());
                        System.out.println("received msg : "+new String(body,"utf-8"));
                    }
                };
                // 开始获取消息
                channel.basicConsume(QUEUE_NAME,true,consumer);
            } catch (Exception e) {
                e.printStackTrace();
            }

    }
}

申请了1个月的腾讯云服务,搭建的RabbitMQ服务。

http://43.142.129.217:15673/#/

admin/admin

过期前都可以用

  1. 延时消息插件


    GitHub下载直达


    下载rabbitmq_delayed_message_exchange-3.10.2.ez到本地

    将插件copy到RabbitMQ容器内
# 拷贝插件
docker cp /rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:/opt/rabbitmq/plugins/

# 进入容器内
docker exec -it rabbitmq bash

# 查看插件列表
rabbitmq-plugins list 

# 开启插件支持 
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出容器
ctrl + p +q

# 重启容器
docker restart rabbitmq

控制台出现x-delayed-message则安装成功

在这里插入图片描述



版权声明:本文为loveme888原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。