企业开发框架SpringBoot——04消息服务RabbitMQ

  • Post author:
  • Post category:其他



目录


为什么使用消息服务


使用消息服务的好处


1.异步处理:


2.应用解耦


​3. 流量削峰:


消息传递常用中间件


RabbitMQ 简介


消息代理过程:


RabbitMQ 工作模式介绍


1.Work queues (工作队列模式)介绍


2.Publish/Subscribe(发布订阅模式)介绍


3.Routing(路由模式)介绍


4.Topics(通配符模式)介绍


5.RPC 介绍​


安装RabbitMQ


(1)windows系统


①安装配置erlang


②安装rabbitMQ


Spring Boot整合RabbitMQ环境搭建


Publish/Subscribe(发布订阅模式)


一、基于API的方式


二、基于配置类的方式


三.基于注解的方式


Routing(路由模式)


① 使用基于注解的方式定制消息组件和消息消费者


② 消息发送者发送消息


Topics(通配符模式)


① 使用基于注解的方式定制消息组件和消息消费者


② 消息发送者发送消息


交换机分类


fanout


direct


headers


为什么使用消息服务

使用消息服务的好处

在多数应用尤其是分布式系统中,消息服务是不可或缺的重要部分,它使用起来比较简单,同时解决了不少难题,例如异步处理、应用解耦、流量削锋、分布式事务管理等,使用消息服务可以实现一个高性能、高可用、高扩展的系统。

1.异步处理:

2.应用解耦


3. 流量削峰:

消息传递常用中间件

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有 ActiveMQ(相对性能较弱),RabbitMQ,Kafka,RocketMQ。

RabbitMQ 简介

RabbitMQ 是采用Erlang 语言实现AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息,官网地址为 http://www.rabbitmq.com,目前最新版本为3.7.12。

RabbitMQ是基于AMQP协议的轻量级、可靠、可伸缩和可移植的消息代理,Spring使用RabbitMQ通过AMQP协议进行通信,在Spirng Boot中对RabbitMQ进行了集成管理。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是由RabbitMQ Technologies Ltd 开发并且提供商业支持的。取Rabbit 这样一个名字,是因为兔子行动非常迅速且繁殖起来非常疯狂, RabbitMQ 的开创者认为以此命名这个分布式软件再合适不过了。RabbitMQ Technologies Ltd 在2010 年4 月被SpringSource C VMWare的一个部门)收购,在2013 年5 月井入Pivotal.其实VMWare 、Pivotal 和EMC 本质上是一家。不同的是VMWare 是独立上市子公司,而Pivotal 是整合了EMC 的某些资源,现在并没有上市。

消息代理过程:

(1)消息发布者(Publisher)向RabbitMQ代理(Broker)指定的虚拟主机服务器(Virtual Host)发送消息

(2)虚拟机主机服务器内部的交换机(Exchange)接收消息,并将消息传递并存储到与之绑定(Binding)的消息队列(Queue)中。

(3)消息消费者(Consumer)通过一定的网络连接(Connection)与消息代理建立连接,同时为简化开支,在连接内部使用了多路复用的信道(Channel)进行消息的最终消费。

RabbitMQ 工作模式介绍

1.Work queues (工作队列模式)介绍

在Work queues工作模式中,不需要设置交换器(RabbitMQ会使用内部默认交换器进行消息转换),需要指定唯一的消息队列进行消息转递)

适用于那些较为繁重,并且可以进行拆分处理的业务,这种情况下可以分派给多个消费者轮流处理业务。

2.Publish/Subscribe(发布订阅模式)介绍

适用于进行相同业务功能处理的场合

3.Routing(路由模式)介绍

适用于进行不同类型消息分类处理的场合

4.Topics(通配符模式)介绍

适用于根据不同需求动态传递处理业务的场合

5.RPC 介绍

适用于远程服务调用的业务处理场合

安装RabbitMQ

(1)windows系统

安装rabbitMQ需要依赖erlang语言环境,所以需要我们下载erlang的环境安装程序。(RabbitMQ与Erlang安装包都在我的资源里)

①安装配置erlang

点击刚才下载的otp_win64_23.0.exe。如果对于安装路径没有特殊要求的话,就一路next直至安装成功即可,默认安装路径为:C:\Program Files\erl-23.0。

接下来配置环境变量,常规操作,新建系统变量-键入变量名ERLANG_HOME,键入变量值:erlang安装路径。如下图:

1、下载安装软件erlang otp,next下一步

2、选择安装目录

3、选择启动文件,安装

4、安装中

5、安装完成

6.配置环境变量

新建一个系统变量:名称ERLANG_HOME,值为本机中erlang的安装目录。

然后再在用户变量PATH中添加上%ERLANG_HOME%\bin;

7.查看安装是否成功:cmd然后erl -version

②安装rabbitMQ

双击我们刚才下载的rabbitmq-server-3.8.5程序,next,install即可,此处需要注意,如果要自定义安装路径的话,路径中最好不要存在中文,会出现错误。

安装完成之后,需要我们激活rabbitmq_management

打开cmd,进到sbin目录下,运行命令

rabbitmq-plugins enable rabbitmq_management

执行成功之后会看到如下图:三个插件被启动

验证

上面的命令执行成功之后,我们就可以通过

http://localhost:15672

来访问web端的管理界面

初始可以通过用户名:guest  密码guest来登录。

这就说明我们安装成功了。

分享几条命令:

net start RabbitMQ  //启动
net stop RabbitMQ  //停止
rabbitmqctl status  //查看状态

Spring Boot整合RabbitMQ环境搭建

1、创建Spring Boot项目,添加Web依赖以及RabbitMQ依赖。

2、在pplication.properties配置文件中编写需要设置的配置属性。

# 配置RabbitMQ消息中间件连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#配置RabbitMQ虚拟主机路径/,默认可以省略
spring.rabbitmq.virtual-host=/

Publish/Subscribe(发布订阅模式)

主要方式:

一、基于API的方式

二、基于配置类的方式

三、基于注解的方式

一、基于API的方式

① 使用AmqpAdmin定制消息发送组件

1、在测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件

@SpringBootTest
class ChenApplicationTests {
    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void amqpAdmin() {
        //定义fanout类型的交换机
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        //定义两个默认的持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        //将队列分别与交换机进行绑定
        amqpAdmin.declareBinding(new 
 Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
        amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
    }
}

2、执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。

3、在RabbitMQ可视化管理页面的Exchanges面板中新出现了一个名称为fanout_exchange的交换器,且其类型是设置的fanout类型。单击fanout_exchange交换器进入查看。

4、切换到Queues面板页面,查看定制生成的消息队列信息

② 消息发送者发送消息

1、消息发送者发送消息,创建一个实体类User(省略setXX()、getXX()方法和toString()方法

public class User {
    private Integer id;
    private String username;
}

2、在项目测试类中使用Spring框架提供的RabbitTemplate模板类实现消息发送。

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void psubPublisher() {
	User user=new User();
	user.setId(1);
	user.setUsername("石头");
	rabbitTemplate.convertAndSend("fanout_exchange","",user);
}

3、执行上述消息发送的测试方法psubPublisher(),控制执行效果如图


因为转换器只支持Spring、byte[]和Serializable序列化后的消息

异常的两种解决方式:①将实体类实现JDK自带的Serializable序列化接口

②定制其他类型的消息转换器

定制其他类型的消息转换器:创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义了一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

4、执行上述测试方法,执行成功,查看RabbitMQ可视化管理页面Queues面板信息,两个消息队列中各自拥有一条待接收的消息。

单击某个队列的详情页面,查看具体的信息

③ 消息消费者接收消息

1、消息消费者接受消息,创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService,在该类中编写如下方法

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQService {

    @RabbitListener(queues = "fanout_queue_email")
    public void psubConsumerEamil(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("邮件队列接收到的消息:" + s);
    }

    @RabbitListener(queues = "fanout_queue_sms")
    public void psubConsumerSms(Message message) {
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("短信接收到消息:"+s);
    }
}

2、启动项目来监听并接收消息队列中的消息。程序启动成功后,立即查看控制台打印结果

二、基于配置类的方式

1、打开RabbitMQ消息配置类RabbitMQConfig,定义消息转换器、fanout类型的交换器、不同名称的消息队列以及将不同名称的消息队列与交换器绑定。

@Configuration
public class RabbitMQConfig2 {
    //自定义消息转换器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    //1.定义fanout类型的交换器
    @Bean
    public Exchange fanout_exchange(){
        return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
    }
    //2.定义两个不同名称的消息队列
    @Bean
    public Queue fanout_queue_email(){
        return new Queue("fanout_queue_email");
    }
    @Bean
    public Queue fanout_queue_sms(){
        return new Queue("fanout_queue_sms");
    }
    //3.将两个不同名称的消息队列与交换器进行绑定
    @Bean
    public Binding bindingEmail(){
        return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
    }
    @Bean
    public Binding bindingSms(){
        return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
    }
}

三.基于注解的方式

1、打开业务类RabbitMQService,使用@RabbitListener注解及其相关属性定制消息发送组件,在该类中编写处理邮件以及短信任务的方法

import com.chen.entity.User;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQService {

    @RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_email"),
                    exchange = @Exchange(value = "fanout_exchange",type = "fanout")))
    public void psubConsumerEamil(User user) {
        System.out.println("邮件业务获取消息:"+user);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_sms"),
                    exchange = @Exchange(value = "fanout_exchange",type = "fanout")))
    public void psubConsumerSms(User user) {
        System.out.println("短信业务获取:"+user);
    }
}

Routing(路由模式)

① 使用基于注解的方式定制消息组件和消息消费者

1、打开业务类RabbitMQService,在该类中使用@RabbitListener注解及其相关属性定制Routing路由模式的消息组件,并模拟编写消息消费者接收的方法。

@Service
public class RabbitMQService {
     //路由模式消息接收,处理error级别日志信息
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"),
                    exchange = @Exchange(value = "routing_exchange",type = "direct"),
                    key = "error_routing_key"))
    public void routingConsumerError(String message){
        System.out.println("接收到Error日志:"+message);
    }

    //路由模式消息接收,处理info,error,warning级别日志信息
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"),
            exchange = @Exchange(value = "routing_exchange",type = "direct"),
            key = {"error_routing_key","info_routing_key","warning_routing_key"}))
    public void routingConsumerInfo(String message){
        System.out.println("接收到其他日志:"+message);
    }
}

② 消息发送者发送消息

在测试类中使用RabbitTemplate模板类实现Routing路由模式下的消息发送

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessage() throws InterruptedException {
        rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","错误信息");
        rabbitTemplate.convertAndSend("routing_exchange","info_routing_key","正常运行");
    }

直接执行上述消息发送的测试方法routingPublisher(),控制台效果如图

将测试方法routingPublisher()中进行消息发送的参数进行修改,调整发送info级别的日志信息(注意同时修改info_routing_key路由键),再次启动该测试方法,查看控制台执行效果,如图


通过RabbitMQ可视化管理页面查看自动定制的Routing路由模式的消息组件,如图所示

Topics(通配符模式)

① 使用基于注解的方式定制消息组件和消息消费者

打开业务类RabbitMQService,在该类中使用@RabbitListener注解及其相关属性定制Topics通配符模式的消息组件,并模拟编写消息消费者接收的方法。

@Service
public class RabbitMQService {
    //通配符模式消息接收,进行邮件业务处理
    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"),
                    exchange = @Exchange(value = "topic_exchange",type = "topic"),
                    key = "info.#.email.#"))
    public void topicConsumerEmail(String message){
        System.out.println("接收邮件处理信息:"+message);
    }

    //通配符模式消息接收,进行邮件业务处理
    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"),
            exchange = @Exchange(value = "topic_exchange",type = "topic"),
            key = "info.#.sms.#"))
    public void topicConsumerSms(String message){
        System.out.println("接收短信处理信息:"+message);
    }
}

② 消息发送者发送消息

在项目测试类Chapter08ApplicationTests中使用RabbitTemplate模板类实现Routing路由模式下的消息发送。

@Test
public void topicPublisher() {
	rabbitTemplate.convertAndSend("topic_exchange","info.email",
		"topics send  email message");
	rabbitTemplate.convertAndSend("topic_exchange","info.sms",
                                "topics send  sms message");
	rabbitTemplate.convertAndSend("topic_exchange",
				         "info.email.sms", "topics send  email and sms message")
;}

执行测试方法topicPublisher(),先进行邮件订阅用户的消息发送

进行短信订阅用户的消息发送

同时进行邮件和短信订阅用户的消息发送方法


通过RabbitMQ可视化管理页面查看自动定制的Topics通配符模式的消息组件,使用基于注解的方式自动生成了Topics通配符模式下的消息组件,并进行了自动绑定。

交换机分类

交换机主要包括如下4种类型:

  1. Fanout exchange(扇型交换机)
  2. Direct exchange(直连交换机)
  3. Topic exchange(主题交换机)
  4. Headers exchange(头交换机)

另外RabbitMQ默认定义一些交换机:默认匿名交换机、amq.* exchanges。还有一类特殊的交换机:Dead Letter Exchange(死信交换机)

fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,所以此时routing key是不起作用的

direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”

binding key与routing key一样也是句点号“. ”分隔的字符串

binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。



版权声明:本文为weixin_43624810原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。