JMS和ActiveMQ的两种发送和接收示例
1 Java消息服务和JMS概述
两个系统之间或者分布式系统之间的信息通信,是我们开发中比较常见的场景,比如系统A要把信息发送给系统B,这个问题我们应该如何去处理?
1999年,原来的SUN公司领衔提出了一种面向消息的中间件服务–JMS规范(标准);
JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者接收到消息后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。
JMS是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。
2 JMS的发展历程
JavaEE | JMS版本 |
---|---|
Java EE 8 | Java Message Service API 2.1 |
Java EE 7 | Java Message Service API 2.0 |
Java EE 6 | Java Message Service API 1.1 |
Java EE 5 | Java Message Service API 1.1 |
J2EE1.4 | Java Message Service API 1.1 |
J2EE1.3 | Java Message Service API 1.0 (不成熟,不够完善) |
3 ActiveMQ概述
ActiveMQ是Apache下的一个项目,采用Java语言开发;ActiveMQ 是一款非常流行的开源消息服务器,实现了JMS规范;官网:
http://activemq.apache.org/
4 ActiveMQ环境搭建
4.1 Linux下环境搭建
ActiveMQ运行需要Java的支持,首先需要配置Java环境变量;
1、下载ActiveMQ
2、解压:其中 -C /usr/local/ 指定把文件解压到哪里去
tar -zxvf apache-activemq-5.15.7-bin.tar.gz -C /usr/local/
解压后就安装完成了,即可使用,不需要其他操作
Java开发的服务器,都是直接解压即可使用:tomcat、maven、zookeeper、ActiveMQ、Mycat
3、切换到解压后的activemq的bin目录下
cd /usr/local/apache-activemq-5.15.7 去启动
4、切换到bin目录下,
启动:./activemq start
5、切换到bin目录下,
关闭:./activemq stop
无法关闭的问题:
1 把data目录下的所有文件及目录都删除
2 把activeMQ主目录下的 tmp 目录也删除
启动后有两个端口号,一个是web控制台:8161,一个是消息服务broker连接端口:61616
web管理控制台admin URL地址:
http://localhost:8161
默认登录账号 admin 密码 admin,注意:Linux防火前要关闭
消息服务broker URL地址 :
tcp://localhost:61616
4.2 windows下环境搭建
1、下载activeMQ windows版本的压缩包
2、解压下载下来的压缩包,解压后就可以直接使用
3、进入解压后的目录的bin下面
4、在地址栏输入 cmd 后确定
5、在dos窗口里面输入: activemq start 进行启动
6、关闭:activemq stop 或者Ctrl + c
5 Java消息服务JMS整体设计结构
基本要素:
1、生产者producer
2、消费者consumer
3、消息服务broker
交互模型
JMS两种消息传送模式:
1、点对点( Point-to-Point):专门用于使用队列Queue传送消息;
2、发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息。
基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。 (1 对 1)
基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。(1 对 多)
6 Java消息服务JMS API总体概览
JMS API可以分为3个主要部分:
1、公共API:可用于向一个队列或主题发送消息或从其中接收消息;
2、点对点API:专门用于使用队列Queue传送消息;
3、发布/订阅API:专门用于使用主题Topic传送消息。
1、JMS公共API
在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:
ConnectionFactory
Connection
Session
Message
Destination
MessageProducer
MessageConsumer
它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建Message、MessageProducer和MessageConsumer。
2、JMS点对点API
点对点(p2p)消息传送模型API是指JMS API之内基于队列(Queue)的接口:
QueueConnectionFactory
QueueConnection
QueueSession
Message
Queue
QueueSender
QueueReceiver
从接口的命名可以看出,大多数接口名称仅仅是在公共API接口名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。
3、JMS发布/订阅API
发布/订阅消息传送模型API是指JMS API之内基于主题(Topic)的接口:
TopicConnectionFactory
TopicConnection
TopicSession
Message
Topic
TopicPublisher
TopicSubscriber
由于基于主题(Topic)的JMS API类似于基于队列(Queue)的API,因此在大多数情况下,Queue这个词会由Topic取代。
7 ActiveMQ点对点发送与接收消息示例
1、加入jms 和 activemq 的相关依赖
2、参考样例代码编写一个消费发送者 和 一个消息接收者
3、运行发送者和接收者的代码,在activeMQ的web控制台观察消息数据
<dependencies>
<!-- JMS规范的jar依赖 -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- activeMQ对jms具体实现的jar依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.8</version>
</dependency>
<!--slf4j日志的简易实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
public class QueueSender {
public static final String BROKER_URL = "tcp://localhost:61616";
public static final String DESTINATION = "myQueue";
public static void main(String[] args) {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
//2、创建一个连接
connection = connectionFactory.createConnection();
//3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4、创建消息
Message message = session.createTextMessage("Hello,ActiveMQ");
//5、消息目的地
Destination destination = session.createQueue(DESTINATION);
//6、消息生产者
producer = session.createProducer(destination);
//7、发送消息
producer.send(message);
} catch (Exception e) {
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//连接的关闭
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
public class QueueReceiver {
public static final String BROKER_URL = "tcp://localhost:61616";
public static final String DESTINATION = "myQueue";
public static void main(String[] args) {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//2、创建一个连接
connection = connectionFactory.createConnection();
//接收消息多一个步骤:把连接启动
connection.start();
//3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4、创建消息
Message message = session.createTextMessage("Hello,ActiveMQ");
//5、消息目的地
Destination destination = session.createQueue(DESTINATION);
//6、消息消费者
messageConsumer = session.createConsumer(destination);
//7、接收消息:receive方法是一个阻塞方法,它会一直等待,直到接收到消息
// receive(Long 超时毫秒数) receiveNoWait()不等待
Message receiveMessage = messageConsumer.receive();
if (receiveMessage instanceof TextMessage) {
String text = ((TextMessage) message).getText();
System.out.println("接收到的消息为:" + text);
}
} catch (Exception e) {
} finally {
if ( messageConsumer!= null) {
try {
messageConsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//连接的关闭
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
接收到的消息为:Hello,ActiveMQ
Process finished with exit code 0
8 ActiveMQ发布与订阅示例
public class TopicPublisher {
public static final String BROKER_URL = "tcp://localhost:61616";
public static final String DESTINATION = "myTopic";
public static void main(String[] args) {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
//2、创建一个连接
connection = connectionFactory.createConnection();
//3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4、创建消息
Message message = session.createTextMessage("Hello,ActiveMQ");
//5、消息目的地
Destination destination = session.createTopic(DESTINATION);
//6、消息生产者
producer = session.createProducer(destination);
//7、发送消息
producer.send(message);
} catch (Exception e) {
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//连接的关闭
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
public class TopicSubscriber {
public static final String BROKER_URL = "tcp://localhost:61616";
public static final String DESTINATION = "myTopic";
public static void main(String[] args) {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//2、创建一个连接
connection = connectionFactory.createConnection();
//接收消息多一个步骤:把连接启动
connection.start();
//3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4、创建消息
Message message = session.createTextMessage("Hello,ActiveMQ");
//5、消息目的地
Destination destination = session.createTopic(DESTINATION);
//6、消息消费者
messageConsumer = session.createConsumer(destination);
//7、接收消息:receive方法是一个阻塞方法,它会一直等待,直到接收到消息
// receive(Long 超时毫秒数) receiveNoWait()不等待
while (true) {
Message receiveMessage = messageConsumer.receive();
if (receiveMessage instanceof TextMessage) {
String text = ((TextMessage) message).getText();
//可以进行业务逻辑的处理
System.out.println("接收到的消息为:" + text);
}
}
} catch (Exception e) {
} finally {
if ( messageConsumer!= null) {
try {
messageConsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//连接的关闭
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
注意:
发布订阅模式必须先启动订阅者,再启动发布者,否则消息接收不到
9 Queue与Topic比较
Queue模式,如果消息没有接收,消息存放在服务器,重启后消息还在。
Topic模式,没有被接收的消息,重启服务器后,消息就不存在。
10 拉模式与推模式
点对点消息,如果没有消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。在这种模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得 (拉模式)。
pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。