目录
为什么使用消息服务
使用消息服务的好处
在多数应用尤其是分布式系统中,消息服务是不可或缺的重要部分,它使用起来比较简单,同时解决了不少难题,例如异步处理、应用解耦、流量削锋、分布式事务管理等,使用消息服务可以实现一个高性能、高可用、高扩展的系统。
1.异步处理:
2.应用解耦
3. 流量削峰:
消息传递常用中间件
我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有 ActiveMQ(相对性能较弱),RabbitMQ,Kafka,RocketMQ。
RabbitMQ 简介
RabbitMQ 是采用Erlang 语言实现AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息,官网地址为 http://www.rabbitmq.com,目前最新版本为3.7.12。
RabbitMQ是基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理,Spring使用RabbitMQ通过AMQP协议进行通信,在Spirng Boot中对RabbitMQ进行了集成管理。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是由RabbitMQ Technologies Ltd 开发并且提供商业支持的。取Rabbit 这样一个名字,是因为兔子行动非常迅速且繁殖起来非常疯狂, RabbitMQ 的开创者认为以此命名这个分布式软件再合适不过了。RabbitMQ Technologies Ltd 在2010 年4 月被SpringSource C VMWare的一个部门)收购,在2013 年5 月井入Pivotal.其实VMWare 、Pivotal 和EMC 本质上是一家。不同的是VMWare 是独立上市子公司,而Pivotal 是整合了EMC 的某些资源,现在并没有上市。
消息代理过程:
(1)消息发布者(Publisher)向RabbitMQ代理(Broker)指定的虚拟主机服务器(Virtual Host)发送消息
(2)虚拟机主机服务器内部的交换机(Exchange)接收消息,并将消息传递并存储到与之绑定(Binding)的消息队列(Queue)中。
(3)消息消费者(Consumer)通过一定的网络连接(Connection)与消息代理建立连接,同时为简化开支,在连接内部使用了多路复用的信道(Channel)进行消息的最终消费。
RabbitMQ 工作模式介绍
1.Work queues (工作队列模式)介绍
在Work queues工作模式中,不需要设置交换器(RabbitMQ会使用内部默认交换器进行消息转换),需要指定唯一的消息队列进行消息转递)
适用于那些较为繁重,并且可以进行拆分处理的业务,这种情况下可以分派给多个消费者轮流处理业务。
2.Publish/Subscribe(发布订阅模式)介绍
适用于进行相同业务功能处理的场合
3.Routing(路由模式)介绍
适用于进行不同类型消息分类处理的场合
4.Topics(通配符模式)介绍
适用于根据不同需求动态传递处理业务的场合
5.RPC 介绍
适用于远程服务调用的业务处理场合
安装RabbitMQ
(1)windows系统
安装rabbitMQ需要依赖erlang语言环境,所以需要我们下载erlang的环境安装程序。(RabbitMQ与Erlang安装包都在我的资源里)
①安装配置erlang
点击刚才下载的otp_win64_23.0.exe。如果对于安装路径没有特殊要求的话,就一路next直至安装成功即可,默认安装路径为:C:\Program Files\erl-23.0。
接下来配置环境变量,常规操作,新建系统变量-键入变量名ERLANG_HOME,键入变量值:erlang安装路径。如下图:
1、下载安装软件erlang otp,next下一步
2、选择安装目录
3、选择启动文件,安装
4、安装中
5、安装完成
6.配置环境变量
新建一个系统变量:名称ERLANG_HOME,值为本机中erlang的安装目录。
然后再在用户变量PATH中添加上%ERLANG_HOME%\bin;
7.查看安装是否成功:cmd然后erl -version
②安装rabbitMQ
双击我们刚才下载的rabbitmq-server-3.8.5程序,next,install即可,此处需要注意,如果要自定义安装路径的话,路径中最好不要存在中文,会出现错误。
安装完成之后,需要我们激活rabbitmq_management
打开cmd,进到sbin目录下,运行命令
rabbitmq-plugins enable rabbitmq_management
执行成功之后会看到如下图:三个插件被启动
验证
上面的命令执行成功之后,我们就可以通过
http://localhost:15672
来访问web端的管理界面
初始可以通过用户名:guest 密码guest来登录。
这就说明我们安装成功了。
分享几条命令:
net start RabbitMQ //启动
net stop RabbitMQ //停止
rabbitmqctl status //查看状态
Spring Boot整合RabbitMQ环境搭建
1、创建Spring Boot项目,添加Web依赖以及RabbitMQ依赖。
2、在pplication.properties配置文件中编写需要设置的配置属性。
# 配置RabbitMQ消息中间件连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#配置RabbitMQ虚拟主机路径/,默认可以省略
spring.rabbitmq.virtual-host=/
Publish/Subscribe(发布订阅模式)
主要方式:
一、基于API的方式
二、基于配置类的方式
三、基于注解的方式
一、基于API的方式
① 使用AmqpAdmin定制消息发送组件
1、在测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件
@SpringBootTest
class ChenApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Test
public void amqpAdmin() {
//定义fanout类型的交换机
amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
//定义两个默认的持久化队列,分别处理email和sms
amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
//将队列分别与交换机进行绑定
amqpAdmin.declareBinding(new
Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
}
}
2、执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。
3、在RabbitMQ可视化管理页面的Exchanges面板中新出现了一个名称为fanout_exchange的交换器,且其类型是设置的fanout类型。单击fanout_exchange交换器进入查看。
4、切换到Queues面板页面,查看定制生成的消息队列信息
② 消息发送者发送消息
1、消息发送者发送消息,创建一个实体类User(省略setXX()、getXX()方法和toString()方法
public class User {
private Integer id;
private String username;
}
2、在项目测试类中使用Spring框架提供的RabbitTemplate模板类实现消息发送。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void psubPublisher() {
User user=new User();
user.setId(1);
user.setUsername("石头");
rabbitTemplate.convertAndSend("fanout_exchange","",user);
}
3、执行上述消息发送的测试方法psubPublisher(),控制执行效果如图
因为转换器只支持Spring、byte[]和Serializable序列化后的消息
异常的两种解决方式:①将实体类实现JDK自带的Serializable序列化接口
②定制其他类型的消息转换器
定制其他类型的消息转换器:创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义了一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4、执行上述测试方法,执行成功,查看RabbitMQ可视化管理页面Queues面板信息,两个消息队列中各自拥有一条待接收的消息。
单击某个队列的详情页面,查看具体的信息
③ 消息消费者接收消息
1、消息消费者接受消息,创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService,在该类中编写如下方法
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQService {
@RabbitListener(queues = "fanout_queue_email")
public void psubConsumerEamil(Message message) {
byte[] body = message.getBody();
String s = new String(body);
System.out.println("邮件队列接收到的消息:" + s);
}
@RabbitListener(queues = "fanout_queue_sms")
public void psubConsumerSms(Message message) {
byte[] body = message.getBody();
String s = new String(body);
System.out.println("短信接收到消息:"+s);
}
}
2、启动项目来监听并接收消息队列中的消息。程序启动成功后,立即查看控制台打印结果
二、基于配置类的方式
1、打开RabbitMQ消息配置类RabbitMQConfig,定义消息转换器、fanout类型的交换器、不同名称的消息队列以及将不同名称的消息队列与交换器绑定。
@Configuration
public class RabbitMQConfig2 {
//自定义消息转换器
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
//1.定义fanout类型的交换器
@Bean
public Exchange fanout_exchange(){
return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
}
//2.定义两个不同名称的消息队列
@Bean
public Queue fanout_queue_email(){
return new Queue("fanout_queue_email");
}
@Bean
public Queue fanout_queue_sms(){
return new Queue("fanout_queue_sms");
}
//3.将两个不同名称的消息队列与交换器进行绑定
@Bean
public Binding bindingEmail(){
return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
}
@Bean
public Binding bindingSms(){
return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
}
}
三.基于注解的方式
1、打开业务类RabbitMQService,使用@RabbitListener注解及其相关属性定制消息发送组件,在该类中编写处理邮件以及短信任务的方法
import com.chen.entity.User;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQService {
@RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_email"),
exchange = @Exchange(value = "fanout_exchange",type = "fanout")))
public void psubConsumerEamil(User user) {
System.out.println("邮件业务获取消息:"+user);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_sms"),
exchange = @Exchange(value = "fanout_exchange",type = "fanout")))
public void psubConsumerSms(User user) {
System.out.println("短信业务获取:"+user);
}
}
Routing(路由模式)
① 使用基于注解的方式定制消息组件和消息消费者
1、打开业务类RabbitMQService,在该类中使用@RabbitListener注解及其相关属性定制Routing路由模式的消息组件,并模拟编写消息消费者接收的方法。
@Service
public class RabbitMQService {
//路由模式消息接收,处理error级别日志信息
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"),
exchange = @Exchange(value = "routing_exchange",type = "direct"),
key = "error_routing_key"))
public void routingConsumerError(String message){
System.out.println("接收到Error日志:"+message);
}
//路由模式消息接收,处理info,error,warning级别日志信息
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"),
exchange = @Exchange(value = "routing_exchange",type = "direct"),
key = {"error_routing_key","info_routing_key","warning_routing_key"}))
public void routingConsumerInfo(String message){
System.out.println("接收到其他日志:"+message);
}
}
② 消息发送者发送消息
在测试类中使用RabbitTemplate模板类实现Routing路由模式下的消息发送
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage() throws InterruptedException {
rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","错误信息");
rabbitTemplate.convertAndSend("routing_exchange","info_routing_key","正常运行");
}
直接执行上述消息发送的测试方法routingPublisher(),控制台效果如图
将测试方法routingPublisher()中进行消息发送的参数进行修改,调整发送info级别的日志信息(注意同时修改info_routing_key路由键),再次启动该测试方法,查看控制台执行效果,如图
通过RabbitMQ可视化管理页面查看自动定制的Routing路由模式的消息组件,如图所示
Topics(通配符模式)
① 使用基于注解的方式定制消息组件和消息消费者
打开业务类RabbitMQService,在该类中使用@RabbitListener注解及其相关属性定制Topics通配符模式的消息组件,并模拟编写消息消费者接收的方法。
@Service
public class RabbitMQService {
//通配符模式消息接收,进行邮件业务处理
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"),
exchange = @Exchange(value = "topic_exchange",type = "topic"),
key = "info.#.email.#"))
public void topicConsumerEmail(String message){
System.out.println("接收邮件处理信息:"+message);
}
//通配符模式消息接收,进行邮件业务处理
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"),
exchange = @Exchange(value = "topic_exchange",type = "topic"),
key = "info.#.sms.#"))
public void topicConsumerSms(String message){
System.out.println("接收短信处理信息:"+message);
}
}
② 消息发送者发送消息
在项目测试类Chapter08ApplicationTests中使用RabbitTemplate模板类实现Routing路由模式下的消息发送。
@Test
public void topicPublisher() {
rabbitTemplate.convertAndSend("topic_exchange","info.email",
"topics send email message");
rabbitTemplate.convertAndSend("topic_exchange","info.sms",
"topics send sms message");
rabbitTemplate.convertAndSend("topic_exchange",
"info.email.sms", "topics send email and sms message")
;}
执行测试方法topicPublisher(),先进行邮件订阅用户的消息发送
进行短信订阅用户的消息发送
同时进行邮件和短信订阅用户的消息发送方法
通过RabbitMQ可视化管理页面查看自动定制的Topics通配符模式的消息组件,使用基于注解的方式自动生成了Topics通配符模式下的消息组件,并进行了自动绑定。
交换机分类
交换机主要包括如下4种类型:
- Fanout exchange(扇型交换机)
- Direct exchange(直连交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
另外RabbitMQ默认定义一些交换机:默认匿名交换机、amq.* exchanges。还有一类特殊的交换机:Dead Letter Exchange(死信交换机)
fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,所以此时routing key是不起作用的
direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
topic
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。