事件驱动开发是一种异步的、用来开发高扩展应用、分布式的架构。它涉及到事件的产生、处理和消费。了解事件驱动首先要了解下面四个概念:
- Message/Event:消息或事件,需要产生、处理和消费的数据;
- Publisher/Producer:消息的发布者或生产者;
- Subsriber/Consumer:消息的订阅者或消费者;
- Message Broker:消息代理或消息中间件,是发布者和订阅者另外的第三方;通过消息代理了来转发消息,发布者和订阅者无需知道彼此的存在。
1.JMS
Spring支持ActiveMQ和Artemis作为JMS消息中间件,Artemis是ActiveMQ下一代的重新实现,本节以Artemis作为示例。
1.1 安装Apache ActiveMQ Artemis
使用docker compose安装Artemis。
stack.yml
version: '3.1'
services:
artemis:
image: vromero/activemq-artemis
restart: always
ports:
- 8161:8161 # 管理控制台端口
- 61616:61616 # 连接端口
environment:
ARTEMIS_USERNAME: wisely
ARTEMIS_PASSWORD: zzzzzz
执行命令:
$ docker-compose -f stack.yml up -d
1.2 新建应用
新建应用,信息如下:
Group:
top.wisely
Artifact:
learning-jms
Dependencies:
Spring for Apache ActiveMQ Artemis
、
Lombok
build.gradle
文件中的依赖如下:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-artemis'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
//...
}
1.3 Spring Boot的自动配置
Spring Boot对JMS提供的自动配置有:
-
ArtemisAutoConfiguration
:通过
ArtemisProperties
使用
spring.artemis.*
来配置连接Artemis的
jmsConnectionFactory
的Bean; -
JmsAutoConfiguration
:-
使用
jmsConnectionFactory
的Bean配置
jmsTemplate
的Bean; -
通过
@EnableJms
开启JMS的支持。
-
使用
1.4 示例
-
连接JMS中间件Artemis
spring: artemis: host: localhost port: 61616 user: wisely password: zzzzzz
-
消息定义
@Data @AllArgsConstructor @NoArgsConstructor public class MessageEvent implements Serializable{ private String id; private String name; }
-
开启计划任务
@SpringBootApplication @EnableScheduling public class LearningJmsApplication {}
-
发送消息
@Component public class MessageProducer { JmsTemplate jmsTemplate; public MessageProducer(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Scheduled(fixedRate = 2000) //1 public void send(){ jmsTemplate.convertAndSend("my-dest", new MessageEvent(UUID.randomUUID().toString(), "wyf")); //2 } }
-
使用
@Scheduled
每隔两秒钟发送一次消息; -
通过
JmsTemplate
的
convertAndSend(String destinationName, final Object message)
方法向指定的终点
my-dest
发送消息
-
使用
-
接受消息并应答
一般我们的消息的消费者和生产者一般在不同的应用中,本例为演示简单放置在一起。
@Component public class MessageConsumer { @JmsListener(destination = "my-dest") //1 @SendTo("confirm-dest") //2 public String consume(MessageEvent event){ System.out.println("在consume方法中处理事件" + event); return "接收到了:" + event.toString(); //3 } }
-
使用
@JmsListener
并从指定终点中获取消息; -
当处理完成后可通过
@SendTo
向指定终点触发另外的处理; -
将返回值的内容发送到
@SendTo
的指定终点。
-
使用
-
处理应答
@Component public class ConfirmConsumer { @JmsListener(destination = "confirm-dest") public void confirmReceived(String confirm){ System.out.println(confirm); } }
-
执行结果
1.5 Topic和Queue
上面的例子我们只有一个消费者,如果我们有多个消费者时:
- Queue:所有消费者会对消息进行负载轮流处理;
- Topic:所有消费者都会收到消息进行处理;
在我们使用:
jmsTemplate.convertAndSend("my-dest",
new MessageEvent(UUID.randomUUID().toString(), "wyf"));
my-dest
时
Destination
的地址,
Destination
的子接口
Queue
和
Topic
,默认自动建立地址为
my-dest
的
Queue
:
@Component
public class MessageConsumer {
@JmsListener(destination = "my-dest")
@SendTo("confirm-desc")
public String consume(MessageEvent event){
System.out.println("在consume方法中处理事件" + event);
return "接收到了:" + event.toString();
}
@JmsListener(destination = "my-dest")
public void consume2(MessageEvent event){
System.out.println("在consume2方法中处理事件" + event);
}
}
启动应用:
当我们在
application.yml
中设置了
spring.jms.pub-sub-domain
设置为
true
时,默认会为我们建立
Topic
,这时所有的消费者都能收到单独的数据:
spring:
artemis:
host: localhost
port: 61616
user: wisely
password: zzzzzz
jms:
pub-sub-domain: true
新书推荐:
我的新书《从企业级开发到云原生微服务:Spring Boot 实战》已出版,内容涵盖了丰富Spring Boot开发的相关知识
购买地址:
https://item.jd.com/12760084.html
主要包含目录有:
第一章 初识Spring Boot(快速领略Spring Boot的美丽)
第二章 开发必备工具(对常用开发工具进行介绍:包含IntelliJ IDEA、Gradle、Lombok、Docker等)
第三章 函数式编程
第四章 Spring 5.x基础(以Spring 5.2.x为基础)
第五章 深入Spring Boot(以Spring Boot 2.2.x为基础)
第六章 Spring Web MVC
第七章 数据访问(包含Spring Data JPA、Spring Data Elasticsearch和数据缓存)
第八章 安全控制(包含Spring Security和OAuth2)
第九章 响应式编程(包含Project Reactor、Spring WebFlux、Reactive NoSQL、R2DBC、Reactive Spring Security)
第十章 事件驱动(包含JMS、RabbitMQ、Kafka、Websocket、RSocket)
第11章 系统集成和批处理(包含Spring Integration和Spring Batch)
第12章 Spring Cloud与微服务
第13章 Kubernetes与微服务(包含Kubernetes、Helm、Jenkins、Istio)
多谢大家支持。