pls-00302: 必须声明 组件_Spring AMQP RabbitAdmin 声明式配置

  • Post author:
  • Post category:其他


Spring AMQP  RabbitAdmin 声明式配置

Spring AMQP 中 比较重要的一个组件 就是 RabbitAdmin , 它封装了关于 声明  Queue ,Exchage , Binding 等等,也可以删除和清空队列数据等等操作,内部是通过RabbitTempate 去发送的请求,调用的还是原生的 amqp-client 的API 操作的,这里面有一定的坑 需要记录一下


概述

Spring 有很多不同的项目,其中就有对AMQP的支持

spring-amqp是对AMQP协议的抽象实现,

而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ

以后可能有其他的基于AMQP协议的 消息中间件(otherbbit)的时候,那么Spring可能会提供 spring-otherbbit 具体实现。


spring-amqp:

提供了如 AmqpAdmin AmqpTemplate 等等

a37ff23babca4da6192a1a313b6c0d05.png


spring-rabbit:

提供了具体的实现  RabbitAdmin 和 RabbitTemplate 等等。。

8de483483727bfa2f439f10cdc74c981.png


通过上面的概述以及图片,大概能够了解 spring-amqp 抽象层 和 spring-rabbit 的关系了


1.Spring AMQP 整合 RabbitMQ

虽然现在都是SpringBoot 直接整合RabbitMQ 但是SpringBoot 内部都是通过 Spring提供的AMQP 去整合的,所以我们一步一步来,先不直接使用SpringBoot自动配置,先自己动手整合


1.1 引入Spring AMQP 依赖

<dependency>    <groupId>org.springframework.bootgroupId>    <artifactId>spring-boot-starter-amqpartifactId>dependency>

查看 依赖数可以发现,spring-boot-starter-amqp 它依赖了  spring-rabbit  它又依赖了操作rabbitmq的   amqp-client 原生API ,和 spring-amqp 抽象层

373b2c7d2b9fae3c6be32ef2e4e5164b.png


1.2 配置RabbitConfig

由于我们是 手动去使用 spring-amqp 去整合 rabbitmq ,那么是需要一些基本配置的如 ConnectionFactory 等等。

添加ConnectionFactory ,使用spring-amqp提供的 CachingConnectionFactory 对原生的ConnectionFactory 进行了封装和维护 具有缓存功能

    @Bean    public ConnectionFactory connectionFactory() {        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();        cachingConnectionFactory.setHost("127.0.0.1");        cachingConnectionFactory.setPort(5672);        cachingConnectionFactory.setUsername("johnny");        cachingConnectionFactory.setPassword("johnny");        cachingConnectionFactory.setVirtualHost("/");        return cachingConnectionFactory;    }    @Bean    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);        //关键!        rabbitAdmin.setAutoStartup(true);        return rabbitAdmin;    }


2. 组件 RabbitAdmin

RabbitAdmin 提供了简单的方式 声明 队列 交换机 等 操作


2.1 Api 分析

先看看 AmqpAdmin的 方法 ,可以看到 抽象层定义了很多 声明方法

f899e50b68eb53b70e1e6c0602819d1a.png

RabbitAdmin  实现了AmqpAdmin 实现了 这些方法,底层使用 rabbitmq 的 amqp-client 原生api 去操作的

a928ad8c8dcd8397cbe9cccac3f57fae.png


2.2 基本使用

可以看到 直接使用 rabbitAdmin的api 声明 queue exchange 和 binding 还是很简单的

@Testpublic void testAdmin() {    Queue queue = new Queue("hello-queue");    //声明一个队列    rabbitAdmin.declareQueue(queue);    //有TopicExchange 有 DirectExchange 和 FanoutExchange 等等     TopicExchange topicExchange = new TopicExchange("topic.exchange001", true, false);    //声明一个交换机    rabbitAdmin.declareExchange(topicExchange);    //声明 一个交换机和队列的绑定     rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with("hello.*"));}


2.3 挑一个 declareQueue 源码看看


可以看到内部确实是使用 rebbitTemplate 去调用的

15d82a2516d49b5c055891d6f4cb2389.png

内部使用的是 原生的 api  去声明队列

cd3588b52b46608913b0a1d528df6dc5.png


3. RabbitAdmin 声明式配置 Queue Exchange Bingding

上面的基本使用是 使用 rabbitAdmin 的 手动声明方式 去声明 Queue Exchange 等等。。下面来说基于 @Bean 配置 的方式

可以直接通过 @Bean 配置的方式去 声明 Queue Exchange 等。。

@Beanpublic Queue queue001() {    return new Queue("queue-001", true);}@Beanpublic TopicExchange exchange001() {    return new TopicExchange("exchange-001", true, false);}@Beanpublic Binding binding001() {    return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");}

不过这里面有点小坑 以上配置并不能成功生效,必须配合 SimpleMessageListenerContainer 才行 ,下面来分析一下RabbitAdmin的源码 来分析一下 为什么@Bean的方式 也能够自动声明


4. RabbitAdmin 源码分析 以及 问题


4.1 实现 InitializingBean (afterPropertiesSet)

可以看到RabbitAdmin 实现了InitializingBean 那么它肯定有个 afterPropertiesSet 去在Bean初始化结束后调用的

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,      BeanNameAware, InitializingBean 


4.2重写  AmqpAdmin  的initalize()

在上面 RabbitAdmin 组件 Api分析那块 可以看到 AmqpAdmin 抽象层提供了一个 initalize() 这里被实现了,并且在afterPropertiesSet中被调用

8fd8e17dfb8de01199434bf5b4b48eca.png


4.3 获取IOC容器中的 Queue Exchange和 Binding 的Bean对象

ed7ef2c160c28822651aa18e9863a8d0.png


4.4 使用RabbitTemplate去 声明

经过上面的源码分析就能看到 RabbitAdmin 如何基于@Bean配置 Queue Exchange等 的

6de9f1bbe026569f5cbd85ab89d65440.png


4.5 存在的问题(依赖SimpleMessageListenerContainer )

那么这个 Lamda 编写的 ConnectionListener 什么时候被调用的呢 ,这就需要引入 SimpleMessageListenerContainer

de14e4505f4a6c64cd4996dfbb578202.png


SimpleMessageListenerContainer

继承 AbstractMessageListenerContainer ,而 AbstractMessageListenerContainer a

实现了 Lifecycle ,它有 start 方法,这里AbstractMessageListenerContainer重写了它  ,内部调用 checkMismatchedQueues()方法

baa34d1efe72f7ff7c33085c691f7be8.png

49726ff5fc4a61a3d6bbec06bc94f96e.png

内部 调用 createConnection ,调用所有监听器的 onCreate 方法 而上面的 Lambda 表达式就会被调用 ,那个Lambda 表达式就是 一个 ConnectionListener

d8d09b8ea4031d10002a43ff4e9550a9.png

最终会调用 这里的 initalize() 去根据@Bean 声明 Queue Exchange 和 Binding

6996ef259b5d709ac7484d85a7575e1e.png

所以容器中 一定要有 SimpleMessageListenerContainer 使用它的 Lifecycle 的 start -> checkMismatchedQueues ->

getConnectionFactory().createConnection() -> getConnectionListener().onCreate(this.connection) -> initialize();

@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);    simpleMessageListenerContainer.addQueues(queue001());    return simpleMessageListenerContainer;}


总结

本篇主要讲解了 1.spring-amqp 和 rabbitmq 的整合它们的关系和配置 2. RabbitAdmin的 Api 和 基本使用

3.RabbitAdmin的声明式配置 方式 4.RabbitAdmin的声明式配置的源码 ,以及  依赖的SimpleMessageListenerContainer 的源码部分 ,有了这些基础 后续再学习SpringBoot整合RabbitMq,去看它的 AutoConfig 等等 源码会更加清晰。



版权声明:本文为weixin_39608988原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。