简介
SpringCloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务组件。它提供了一个灵活的编程模型,基于Spring Boot 建立独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接可以让我们在使用时几乎无需关心具体的消息队列实现。它屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型,让开发人员能够更多的关注自己的业务。
架构模型
或许我们也可以看一个更为简洁的图
我们可以看到,每个系统只依赖于自己的
Binder
和消息中间件或者说其他系统交互, Stream 隐藏了所有消息的发送细节,对于它来说只关心三个核心模块
-
Destination Binders
:目标绑定器,告诉 Stream 你需要绑定到哪个消息队列服务的
Binder
实现即可。例如
RabbitMQ
还是
Kafka
的
Binder
?这是它的核心构建块,负责支持和提供与我们拥有的外部系统或外部消息传递系统的集成 -
Destination Bindings
:目的地绑定,把消息生产者和消费者之间的桥梁提供给 Stream 。例如对于
RabbitMQ
来说,你需要告诉 Stream 当前系统发送消息所使用的的
channel -> exchange -> routingKey -> queue
分别是什么(当然这些都是在配置文件中完成的) -
Message
:就是我们需要发送的消息
对于任何消息来说,只需要提供上述三个核心模块即可,我们无需去关心发送的细节。
直至 SpringCloud Stream 3.2.1 版本,它已经支持了几乎所有市面上流行的消息队列产品。
RabbitMQ、Kafka、RocketMQ、AWS SNS/SQS
等等,主要是因为这种一统江湖的趋势让不同的消息中间件厂商都开发了自己的绑定器
Binder
提供给 SpringCloud Stream。
初体验
下面以 RabbitMQ 为例体验一下 Stream 消息驱动开发。首先我们需要引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
复制代码
接下来我们需要在配置文件中指定相关配置,在此之前请确保你对 RabbitMQ 中的组件有一个基本的认识,否则请先阅读
RabbitMQ 基础篇
生产者
配置文件:
spring:
#消息队列地址
rabbitmq:
host: 129.204.178.49 #你的 rabbitmq 服务地址
port: 5672
username: guest
password: guest
cloud:
stream: #SpringCloud Stream 配置
bindings:
output-channel-demo: # channel 消息输出通道
destination: demo-exchange # 交换机
binder: demo-binder # 绑定器
binders:
demo-binder: #绑定器
type: rabbit # rabbitmq
rabbit:
bindings:
output-channel-demo: # channel 消息输出通道
producer: # 生产者
routing-key-expression: '''demoRoutingKey'''
复制代码
声明输出通道
/**
* 声明消息输出通道 channel
* */
public interface MessageSource {
@Output("output-channel-demo")
MessageChannel output();
}
复制代码
定义一个通道绑定类
/**
* 该注解用来指定一个或多个定义了 @Input 或 @Output 注解的接口,以此实现对消息通道(Channel)的绑定
* */
@EnableBinding(MessageSource.class)
public class MessageSourceHandler {
}
复制代码
接下来我们写一个集成测试发送消息即可
@Autowired
MessageSource messageSource;
/** 发送消息测试 */
@Test
public void test() {
messageSource.output().send(MessageBuilder.withPayload("测试消息").build());
}
复制代码
此时消息就成功的发送出去了,接下来我们来写消费者
消费者
配置文件
spring:
cloud:
stream:
binders:
demo-binder: #绑定器
type: rabbit #rabbitmq
rabbit:
bindings:
input-channel-demo: #消息输入通道 channel
consumer:
binding-routing-key: 'demoRoutingKey'
bindings:
input-channel-demo: #消息输入通道 channel
group: someGroup #防止多个消费者实例重复接收消息,这样一条消息只会发送给相同组的其中一个实例
destination: demo-exchange #交换机
binder: demo-binder #绑定器
rabbitmq:
host: 129.204.178.49
port: 5672
username: guest
password: guest
复制代码
声明输入通道
/**
* 声明消息输入通道 channel
* */
public interface MessageSink {
@Input("input-channel-demo")
SubscribableChannel input();
}
复制代码
声明绑定类
@EnableBinding(MessageSink.class)
public class MessageSinkHandler {
/**
* 监听 input-channel-demo 通道的消息,该 @StreamListener 注解支持 SPEL 表达式,但是被标注的方法不能有返回值
* */
@StreamListener("input-channel-demo")
public void consume(String message){
System.out.println("接受到消息:"+message);
}
}
复制代码
这样一个完整的 SpringCloud Stream 微服务消息驱动的 demo 就完成了,启动应用,消费者能成功的收到生产者发送的测试消息。
要用好 SpringCloud Stream 你必须弄懂配置文件的内容!
GitHub 源码地址
SpringCloud-Stream 入门案例
发送延迟消息
在 SpringCloud Stream 中发送延迟消息非常简单,首先我们需要在生产者、消费者的配置文件中指定交换机的类型是延迟交换机
rabbit:
bindings:
input-channel-demo: #消息输入通道 channel
consumer:
delayed-exchange: true
binding-routing-key: 'demoRoutingKey'
复制代码
生产者一样,这里省略。然后只需要在上面发送的代码中加一个 header 即可
//设置消息30秒后发送到消费者
messageSource.output().send(MessageBuilder.withPayload("测试消息")
.setHeader("x-delay",30 * 1000).build());
复制代码
如果你发送延迟消息抛出 unknown exchange type ‘x-delayed-message’ 异常,那么是因为你的 RabbitMQ 服务没有安装延迟队列插件。去官网安装一下即可
这样一个延迟消息的业务就实现了,看到这里你会发现使用 SpringCloud Stream 整合消息很简单,例如实际上对于整合
RabbitMQ
来说,几乎所有的配置都在
RabbitConsumerProperties、RabbitProductProperties
中,生产者和消费者共有的属性在它们的父类
RabbitCommonProperties
中。几乎 RabbitMQ 的所有特性和功能都可以直接在配置文件中完成。作者能力有限,其他高级特性配置详情可以参考官网
RabbitMQ Consumer Properties
但如果你真这么觉得那你就大错特错了,正如 SpringBoot ,用起来很简单可能只需要花费 20% 的精力,但是想玩的好,可能要付出 200% 的精力。SpringCloud Stream 其实包含了一系列复杂技术体系,
Spring Intergration、Spring Message、Spring AMQP
等等,其内部原理实现、组件的集成非常复杂。
我想 SpringCloud Stream 出生这么久还不广泛流行的原因之一就是,这一套技术体系涉及的东西太多了,万一生产环境出现什么疑难杂症,需要去阅读源码解决的话,这样的技术工作量是很超出预期的。
Spring Message
Spring Message
是
Spring Framework
的一个子模块,它定义了消息的统一编程模型,实际上 SpringCloud Stream 也是基于它实现的统一。
Spring Message 定义了上图的消息编程模型,提出了通道
Channel
和 消息
Message
的抽象,所有的消息都由生产者发送到输出通道
Output
中给消息中间件,然后所有的消费者都从输入通道
Input
中获取消息,而消息
Message
本身由两部分组成,消息头
header
和 消息体
payload
。
在上述的
初体验
中,我们涉及到的几个核心注解正是该模型的体现
-
@Output:代表输出通道,生产者从这发出消息
-
@Input:代表输入通道,消费者从这读取消息
-
@EnableBinding:将定义通道的接口绑定到某个
Bean
以便于我们可以通过该
Bean
操作通道进行发送和接收消息。
-
@StreamListener:订阅输入通道中的消息
SpringCloud Function 函数式编程
在 SpringCloud Stream 3.1 版本之后,你会发现
@EnableBinding
等几个核心注解被官方标注废弃了,这是因为官方推出了更新的函数式编程模型 SpringCloud Function,试图用这个组件将编程推向一个更高的层次。本篇文章不详细介绍该组件,简单介绍在 SpringCloud Stream 中如何结合 SpringCloud Function 进行消息发送和消费。
在结合 SpringCloud Function 时消息的通道命名要遵循以下约定
-
输入 :
<functionName> + -in- + <index>
-
输出 :
<functionName> + -out- + <index>
index
代表输入或输出绑定的索引,目前我们直接写
0
即可。
任务型消息
参考官方文档
Suppliers (Sources)
,我们开始写一个生产者的发送消息方法。
@Bean
public Supplier<String> source1() {
return () -> "测试定时消息";
}
复制代码
然后根据通道规则在
application.yml
中配置通道名为
source1-out-0
,再配置
spring.cloud.function.destination = source1
,指定
function
的函数方法名。
接下来我们开始写消费者,同样我们需要一个消费方法。
@Bean
public Consumer<String> sink1() {
return message -> System.out.println("收到消息:" + message);
}
复制代码
然后根据通道规则把配置文件中的通道名改为
sink1-in-0
。这样一个简单的定时消息的发送和接收就完成了,生产者会每秒给消费者发送一条消息,不得不说,SpringCloud Stream 和 SpringCloud Function 的集成真的是……太神奇了。
业务触发型消息
但通常我们更多的应用场景是业务触发发送消息,所以 SpringCloud Stream 给我们提供了一个
StreamBridge
组件。使用它发送消息只要指定通道名即可
@Test
public void test() {
streamBridge.send("source1-out-0","测试消息");
}
复制代码
这样我们就已经完成了消息的发送,消费者还是用上面的消费函数即可。
总结
不得不说集成 SpringCloud Function 之后,消息的发送和接收又迈进了一个崭新的阶段,但
<functionName> + -in- + <index>
这样的配置规约我觉得让我有些难受……甚至目前我认为 3.1 之前被废弃的注解方式也许更适合我们开发使用。