一、RabbitMQ介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景:
1、任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
为什么使用RabbitMQ呢?
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
二、使用MQ的好处
2.1实现异步处理
同步的通信:发出一个调用请求之后,在没有得到结果之前,就不返回。由调用者主动等待这个调用的结果。
异步通信:调用在发出之后,这个调用就直接返回了,所以没有返回结果。也就是说,当一个异步过程调用发出后,调用者不会马上得到结果。而是在调用发出后,
被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
2.2实现解耦
耦合是系统内部或者系统之间存在相互作用,相互影响和相互依赖。在我们的分布式系统中,一个业务流程涉及多个系统的时候,他们之间就会形成一个依赖关系。
在传统的通信方式中,订单系统发生了退货的动作,那么要依次调用所有下游系统的 API,比如调用库存系统的 API 恢复库存,因为这张火车票还要释放出去给其他乘客购买;调用支付系统的 API,不论是支付宝微信还是银行卡,要把手续费扣掉以后,原路退回给消费者;调用通知系统 API 通知用户退货成功。
// 伪代码 public void returnGoods(){ stockService.updateInventory (); payService.refund(); noticeService.notice();
这个过程是串行执行的,如果在恢复库存的时候发生了异常,那么后面的代码都不会执行。由于这一系列的动作,恢复库存,资金退还,发送通知,本质上没有一个严格的先后顺序,也没有直接的依赖关系,也就是说,只要用户提交了退货的请求,后面的这些动作都是要完成的。库存有没有恢复成功,不影响资金的退还和发送通知。
使用多线程
多线程或者线程池是可以实现的,但是每一个需要并行执行的地方都引入线程,又会带来线程或者线程池的管理问题。所以使用MQ
订单系统只需要把退货的消息发送到消息队列上,由各个下游的业务系统自己创建队列,然后监听队列消费消息。
在这种情况下订单系统里面就不需要配置其他系统的 IP、端口、接口地址了,因为它不需要关心消费者在网络上的什么位置,所以下游系统改 IP 没有任何影响。
甚至不需要关心消费者有没有消费成功,它只需要把消费发到消息队列的服务器上就可以了。这样,我们就实现了系统之间依赖关系的解耦。
2.3实现流量削锋
在很多的电商系统里面,有一个瞬间流量达到峰值的情况,比如京东的 618,淘宝的双 11,还有小米抢购。普通的硬件服务器肯定支撑不了这种百万或者千万级别的并发量,就像 2012 年的小米一样,动不动服务器就崩溃。如果通过堆硬件的方式去解决,那么在流量峰值过去以后就会出现巨大的资源浪费。那要怎么办呢?如果说要保护我们的应用服务器和数据库,
限流也是可以的,但是这样又会导致订单的丢失,没有达到我们的目的。
引入MQ,MQ是队列,一定有队列的特性,(先进先出)就可以先把所有的流量承接下来,转换成 MQ 消息发送到消息队列服务器上,业务层就可以根据自己的消费速率去处理这些消息,
处理之后再返回结果。就像我们在火车站排队一样,大家只能一个一个买票,不会因为人多就导致售票员忙不过来。如果要处理快一点,大不了多开几个窗口(增加几个消费者)。
总结起来:
1) 对于数据量大或者处理耗时长的操作,我们可以引入 MQ 实现异步通信,减少客户端的等待,提升响应速度。
2) 对于改动影响大的系统之间,可以引入 MQ 实现解耦,减少系统之间的直接依赖。
3) 对于会出现瞬间的流量峰值的系统,我们可以引入 MQ 实现流量削峰,达到保护应用和数据库的目的。
三、 RabbitMQ 中的概念模型
MQ的本质:消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息队列模型, 可以在分布式环境下扩展进程的通信
MQ的特点:
1、 是一个独立运行的服务。生产者发送消息,消费者接收消费,需要先跟服务器建立连接。
2、 采用队列作为数据结构,有先进先出的特点。
3、 具有发布订阅的模型,消费者可以获取自己需要的消息。
消息模型:
所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,
最后将消息发送到监听的消费者。
RabbitMQ的基本概念
下图是RabbitMQ的基本结构:
组成部分说明如下:
- Broker :消息队列服务,此进程包括两个部分:Exchange和Queue。
- Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。队列使用绑定键(Binding Key)跟交换机建立绑定关系。
- Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方,它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
- Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Connection:无论是生产者发送消息,还是消费者接收消息,都必须跟Broker之间建立一个连接,这个是TCP长连接
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Virtual Host理解如下图:
相关名词:
包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。
ConnectionFactory
(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel
(信道):消息推送使用的通道;
Exchange
(交换器):用于接受、分配消息;
Queue
(队列):用于存储生产者的消息;
RoutingKey
(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):
用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:
消息发布接收流程:
—–发送消息—–
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
—-接收消息—–
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
四、 下载安装
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open
Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需
要安装Erlang/OTP,并保持版本匹配,如下图:
RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
1)下载erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe,以管理员方式运行此文件,安装。
erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添
加%ERLANG_HOME%\bin;
2)安装RabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3,以管理员方式运行此文件,安装
3)启动
- 安装成功后会自动创建RabbitMQ服务并且启动。
从开始菜单启动RabbitMQ,完成在开始菜单找到RabbitMQ的菜单:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动
2.如果没有开始菜单则进入安装目录下sbin目录手动启动:
1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
3) 注意事项:
1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang
搜索RabbitMQ、ErlSrv,将对应的项全部删除。
五、java操作队列
1、消息队列RabbitMQ的五种形式队列
1).点对点(简单)的队列
2).工作(公平性)队列模式
3.发布订阅模式
4.路由模式Routing
5.通配符模式Topics
2、简单队列
1)功能:一个生产者P发送消息到队列Q,一个消费者C接收
P表示为生产者 、C表示为消费者 红色表示队列。
点对点模式分析:
Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
封装
Connection:
/** * 封装Connection */ public class MQConnectionUtils { public static Connection getConnection(){ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务器地址 factory.setHost("localhost"); //设置端口号 factory.setPort(5672); //设置用户名 factory.setUsername("guest"); //设置密码 factory.setPassword("guest"); //设置vhost factory.setVirtualHost("/admin_yehui"); try { //创建连接 return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
参数详解:
1)声明交换机的参数
String type:交换机的类型,direct, topic, fanout 中的一种。
boolean durable:是否持久化,代表交换机在服务器重启后是否还存在。
2)声明队列的参数
boolean durable:是否持久化,代表队列在服务器重启后是否还存在。
boolean exclusive:是否排他性队列。排他性队列只能在声明它的 Connection中使用(可以在同一个 Connection 的不同的 channel 中使用),连接断开时自动删除。
boolean autoDelete:是否自动删除。如果为 true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。
Map<String, Object> arguments:队列的其他属性
3)消息属性 BasicProperties
以下列举了一些主要的参数:
生产者:
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接,是否排他性队列。排他性队列只能在声明它的 Connection中使用(可以在同一个 Connection 的不同的 channel 中使用), 连接断开时自动删除。
* 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); String msg = "test_yehui_rabbitmq"; /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送体:"+msg); channel.close(); connection.close(); } }