RabbitMQ控制台
在已经安装好RabbitMQ后,访问http://localhost:15672/即可打开RabbitMQ官方提供的控制台。初始账号和密码都是:guest
进入控制台看到如下界面,可以通过通知台查看连接情况,信道以及交换机和队列的详细信息,还可以通过Admin来添加新管理员并分配权限。具体操作就不再演示,本文主要介绍使用Java语言如何来操作RabbitMQ。
Java语言操作RabbitMQ
简单看一下RabbitMQ的工作原理图,无论是生产者还是消费者都需要与RabbitMQ建立连接Connection,但是可以看到一个Connection内部有很多个Channel,实际上生产者和消费者是通过Channel与Broker(RabbitMQ Server本身)进行交互的。所以操作RabbitMQ首先需要获取Connection再创建Channel,主要的操作都是通过Channel来完成的。
简单介绍一下各个关键词的含义:
-
Broker
接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。 -
VirtualHost
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。 -
Connection
publisher/consumer 和 broker 之间的 TCP 连接 -
Channel
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP Method 包含了 channel id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP Connection 的开销。 -
Exchange
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) -
Queue
消息最终被送到这里等待 consumer 取走 -
Binding
exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保
存到 exchange 中的查询表中,用于 message 的分发依据
引入jar包
依赖的版本可以根据自己情况选择,我这里使用的5.8.0
<!--rabbitmq依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
获取连接,获取信道
创建连接通常,配置必要属性,获取连接,在通过连接创建信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);// 默认就是5672,可以不配置
factory.setUsername("guest");
factory.setPassword("guest");
// 虚拟机,MQ支持虚拟机,创建后相当于一个全新的MQ
factory.setVirtualHost("/poxiao");
// 配置完参数后,创建一个连接
Connection connection = factory.newConnection();
// 通过连接创建一个信道,接下来的操作都是通过信道来完成
Channel channel = connection.createChannel();
在实际应用中,创建连接的操作会被封装成一个工具类,并不是每次获取Channel都需要重新创建连接,使用已经创建好的Connection可以有效的减少连接带来的开销。
通过信道来声明交换机\队列,以及绑定等操作
String EXCHANGE_NAME = "poxiao_queue";
String QUEUE_NAME = "poxiao_queue";
/**
* 声明一个DIRECT类型的交换机
* 参数:
* 1、交换机名
* 2、交换机类型,有四种,分别为 DIRECT, FANOUT, TOPIC, HEADERS;
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 声明队列
* 参数:
* 1、队列名
* 2、队列是否支持持久化
* 3、是否为独占队列,只支持当前连接访问
* 4、是否自动删除,当没有服务使用此队列时自动删除,可以用来做临时队列
* 5、队列的其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 队列与交换机建立绑定关系
* 参数:
* 1、队列名
* 2、交换机名
* 3、routingKey叫路由key,也叫绑定key。解释:消息先到达交换机,再由交换机通过某种规则分配给指定的队列,
* 对于DIRECT类型的交换机,消息生产者在发送消息时会指定消息发送到哪个具体的routingKey,交换机通过routingKey再发送到指定队列中。
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
发送消息和接收消息
/**
* 生产者只需要将消息投递给交换机,而交换机负责将消息发送给对应的队列
* 参数:
* 1、交换机名
* 2、routingKey,交换机根据routingKey去寻找相匹配的队列,并将消息发送到队列中
* 3、消息属性
* 4、消息体
*/
channel.basicPublish(EXCHANGE_NAME, "key1", null, "你好!".getBytes(StandardCharsets.UTF_8));
/**
* 消费者负责消费指定队列中的消息
* 参数:
* 1、队列名
* 2、是否自动确认,true代表自动确认,消费者接收到消息后会自动相MQ发送确认信号,表示我已经接收到消息了,你可以把消息删除了。
* 3、接收消息的回调函数
* 4、当一个消费者取消订阅时的回调
*/
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(new String(message.getBody(),StandardCharsets.UTF_8));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println(consumerTag);
}
});
这样是简单的实现,对于高级部分,如:
如何保证消息不丢失可以看:
RabbitMQ如何保证消息不丢失
交换机的详细使用可以看:
RabbitMQ交换机的四种类型
MQ的高级用法可以看:
RabbitMQ的死信队列和延迟队列