ActiveMQ学习1-JMS和ActiveMQ的两种发送和接收示例

  • Post author:
  • Post category:其他




1 Java消息服务和JMS概述

两个系统之间或者分布式系统之间的信息通信,是我们开发中比较常见的场景,比如系统A要把信息发送给系统B,这个问题我们应该如何去处理?

1999年,原来的SUN公司领衔提出了一种面向消息的中间件服务–JMS规范(标准);

JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者接收到消息后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。

JMS是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。



2 JMS的发展历程

JavaEE JMS版本
Java EE 8 Java Message Service API 2.1
Java EE 7 Java Message Service API 2.0
Java EE 6 Java Message Service API 1.1
Java EE 5 Java Message Service API 1.1
J2EE1.4 Java Message Service API 1.1
J2EE1.3 Java Message Service API 1.0 (不成熟,不够完善)



3 ActiveMQ概述

ActiveMQ是Apache下的一个项目,采用Java语言开发;ActiveMQ 是一款非常流行的开源消息服务器,实现了JMS规范;官网:

http://activemq.apache.org/



4 ActiveMQ环境搭建



4.1 Linux下环境搭建

ActiveMQ运行需要Java的支持,首先需要配置Java环境变量;

1、下载ActiveMQ

2、解压:其中 -C /usr/local/ 指定把文件解压到哪里去

tar -zxvf apache-activemq-5.15.7-bin.tar.gz -C /usr/local/ 

解压后就安装完成了,即可使用,不需要其他操作

Java开发的服务器,都是直接解压即可使用:tomcat、maven、zookeeper、ActiveMQ、Mycat

3、切换到解压后的activemq的bin目录下

cd /usr/local/apache-activemq-5.15.7 去启动

4、切换到bin目录下,

启动:./activemq start

5、切换到bin目录下,

关闭:./activemq stop
无法关闭的问题:
1 把data目录下的所有文件及目录都删除
2 把activeMQ主目录下的 tmp 目录也删除

启动后有两个端口号,一个是web控制台:8161,一个是消息服务broker连接端口:61616

web管理控制台admin URL地址:

http://localhost:8161

默认登录账号 admin 密码 admin,注意:Linux防火前要关闭

消息服务broker URL地址 :

tcp://localhost:61616



4.2 windows下环境搭建

1、下载activeMQ windows版本的压缩包

2、解压下载下来的压缩包,解压后就可以直接使用

3、进入解压后的目录的bin下面

4、在地址栏输入 cmd 后确定

5、在dos窗口里面输入: activemq start 进行启动

6、关闭:activemq stop 或者Ctrl + c



5 Java消息服务JMS整体设计结构

基本要素:

1、生产者producer
2、消费者consumer
3、消息服务broker

交互模型

在这里插入图片描述

JMS两种消息传送模式:

1、点对点( Point-to-Point):专门用于使用队列Queue传送消息;

2、发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息。

基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。 (1 对 1)

基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。(1 对 多)



6 Java消息服务JMS API总体概览

JMS API可以分为3个主要部分:
1、公共API:可用于向一个队列或主题发送消息或从其中接收消息;
2、点对点API:专门用于使用队列Queue传送消息;
3、发布/订阅API:专门用于使用主题Topic传送消息。

1、JMS公共API

在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:

ConnectionFactory
Connection
Session
Message
Destination
MessageProducer
MessageConsumer

它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建Message、MessageProducer和MessageConsumer。

2、JMS点对点API

点对点(p2p)消息传送模型API是指JMS API之内基于队列(Queue)的接口:

QueueConnectionFactory
QueueConnection
QueueSession
Message
Queue
QueueSender
QueueReceiver

从接口的命名可以看出,大多数接口名称仅仅是在公共API接口名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。

3、JMS发布/订阅API

发布/订阅消息传送模型API是指JMS API之内基于主题(Topic)的接口:

TopicConnectionFactory
TopicConnection
TopicSession
Message
Topic
TopicPublisher
TopicSubscriber

由于基于主题(Topic)的JMS API类似于基于队列(Queue)的API,因此在大多数情况下,Queue这个词会由Topic取代。



7 ActiveMQ点对点发送与接收消息示例

在这里插入图片描述

1、加入jms 和 activemq 的相关依赖
2、参考样例代码编写一个消费发送者 和 一个消息接收者
3、运行发送者和接收者的代码,在activeMQ的web控制台观察消息数据
<dependencies>
        <!-- JMS规范的jar依赖 -->
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>

        <!-- activeMQ对jms具体实现的jar依赖 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.8</version>
        </dependency>

        <!--slf4j日志的简易实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
</dependencies>
public class QueueSender {
    public static final String BROKER_URL = "tcp://localhost:61616";
    public static final String DESTINATION = "myQueue";

    public static void main(String[] args) {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //2、创建一个连接
            connection = connectionFactory.createConnection();

            //3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //4、创建消息
            Message message = session.createTextMessage("Hello,ActiveMQ");

            //5、消息目的地
            Destination destination = session.createQueue(DESTINATION);

            //6、消息生产者
            producer = session.createProducer(destination);

            //7、发送消息
            producer.send(message);

        } catch (Exception e) {

        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            //连接的关闭
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

在这里插入图片描述

public class QueueReceiver {
    public static final String BROKER_URL = "tcp://localhost:61616";
    public static final String DESTINATION = "myQueue";

    public static void main(String[] args) {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

        Connection connection = null;
        Session session = null;
        MessageConsumer messageConsumer = null;
        try {
            //2、创建一个连接
            connection = connectionFactory.createConnection();
            //接收消息多一个步骤:把连接启动
            connection.start();

            //3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //4、创建消息
            Message message = session.createTextMessage("Hello,ActiveMQ");

            //5、消息目的地
            Destination destination = session.createQueue(DESTINATION);

            //6、消息消费者
            messageConsumer = session.createConsumer(destination);

            //7、接收消息:receive方法是一个阻塞方法,它会一直等待,直到接收到消息
            //   receive(Long 超时毫秒数) receiveNoWait()不等待
            Message receiveMessage = messageConsumer.receive();

            if (receiveMessage instanceof TextMessage) {
                String text = ((TextMessage) message).getText();
                System.out.println("接收到的消息为:" + text);
            }

        } catch (Exception e) {

        } finally {
            if ( messageConsumer!= null) {
                try {
                    messageConsumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            //连接的关闭
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
接收到的消息为:Hello,ActiveMQ

Process finished with exit code 0

在这里插入图片描述



8 ActiveMQ发布与订阅示例

public class TopicPublisher {
    public static final String BROKER_URL = "tcp://localhost:61616";
    public static final String DESTINATION = "myTopic";

    public static void main(String[] args) {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //2、创建一个连接
            connection = connectionFactory.createConnection();

            //3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //4、创建消息
            Message message = session.createTextMessage("Hello,ActiveMQ");

            //5、消息目的地
            Destination destination = session.createTopic(DESTINATION);

            //6、消息生产者
            producer = session.createProducer(destination);

            //7、发送消息
            producer.send(message);

        } catch (Exception e) {

        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            //连接的关闭
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class TopicSubscriber {
    public static final String BROKER_URL = "tcp://localhost:61616";
    public static final String DESTINATION = "myTopic";

    public static void main(String[] args) {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

        Connection connection = null;
        Session session = null;
        MessageConsumer messageConsumer = null;
        try {
            //2、创建一个连接
            connection = connectionFactory.createConnection();
            //接收消息多一个步骤:把连接启动
            connection.start();

            //3、创建一个Session,第一个参数表示是否为事务消息,第二个参数表示消息确认方式
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //4、创建消息
            Message message = session.createTextMessage("Hello,ActiveMQ");

            //5、消息目的地
            Destination destination = session.createTopic(DESTINATION);

            //6、消息消费者
            messageConsumer = session.createConsumer(destination);

            //7、接收消息:receive方法是一个阻塞方法,它会一直等待,直到接收到消息
            //   receive(Long 超时毫秒数) receiveNoWait()不等待
            while (true) {
                Message receiveMessage = messageConsumer.receive();

                if (receiveMessage instanceof TextMessage) {
                    String text = ((TextMessage) message).getText();
                    //可以进行业务逻辑的处理
                    System.out.println("接收到的消息为:" + text);
                }
            }


        } catch (Exception e) {

        } finally {
            if ( messageConsumer!= null) {
                try {
                    messageConsumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            //连接的关闭
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
注意:
发布订阅模式必须先启动订阅者,再启动发布者,否则消息接收不到



9 Queue与Topic比较

在这里插入图片描述

Queue模式,如果消息没有接收,消息存放在服务器,重启后消息还在。
Topic模式,没有被接收的消息,重启服务器后,消息就不存在。



10 拉模式与推模式

点对点消息,如果没有消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。在这种模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得 (拉模式)。

pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。



版权声明:本文为qq_38339124原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。