基于maven的Spring+ActiveMQ整合Demo

  • Post author:
  • Post category:其他


本文主要是示范基于Maven的ActiveMQ+Spring的简单使用,基于ActiveMQ消息代理的Spring JMS消息配置,以及定时任务的使用。

JMS简介

JMS提供了应用之间的异步通信机制,当异步发送消息时,客户端不需要等待服务端处理消息结果。

同步通信与异步通信

构建JMS

两个主要概念:

消息代理

(message broker)和

目的地

(destination)。

当一个应用发送消息时,会把消息交给一个消息代理。消息代理实际上是JMS版的邮局。消息代理可以确保消息被投递到指定的目的地,同时释放发送者,使其能够继续其他的业务。

目的地就像一个邮箱,可以将消息放入邮箱,直至有人将其取走。JMS中有两种类型的目的地:

队列



主题

。,分别应用于队列的点对点模型和主题的发布/订阅模型。

点对点消息模型

每一个消息都有一个发送者和一个接收者。

点对点消息模型

当消息代理得到消息后,会将消息放入到消息队列中,接收者请求队列中的下一条消息时,该消息就会从队列中取出,投递给接收者。因为消息投递后会从队列中删除,因此可以保证消息只投递给一个接收者。

可以使用多个接收者来处理队列中的消息,不过每个接收者都会处理自己接收到的消息,需要多个接收者监听队列。

发布-订阅消息模型

消息会发送给一个

主题

,多个接收者可以监听一个主题。与队列不同的是,消息不再是只投递给一个接收者,所有主题的订阅者都可以接收到此消息。

发布-订阅消息模型

JMS的优点

  • 无需等待
  • 面向消息和解耦
  • 位置独立
  • 确保投递

在Spring中搭建消息代理

安装ActiveMQ


ActiveMQ

是一个开源的消息代理, 也是使用JMS进行异步消息传递的最佳选择。在

官方网站下载

后,解压缩安装包,点击

apache-activemq-5.12.1\bin\activemq.bat

运行即可(64位操作系统可能需要点击

apache-activemq-5.12.1\bin\win64\activemq.bat

运行)。运行后进入

http://localhost:8161/

表明安装成功, 这时就可以使用它进行消息代理了。

maven配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.springframework.samples.service.service</groupId>
    <artifactId>activemq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>

    <properties>

        <!-- Generic properties -->
        <java.version>1.6</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Web -->
        <jsp.version>2.2</jsp.version>
        <jstl.version>1.2</jstl.version>
        <servlet.version>2.5</servlet.version>


        <!-- Spring -->
        <spring-framework.version>3.2.3.RELEASE</spring-framework.version>

        <!-- Logging -->
        <logback.version>1.0.13</logback.version>
        <slf4j.version>1.7.5</slf4j.version>

        <!-- Test -->
        <junit.version>4.11</junit.version>

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>1.8.5</version>
        </dependency>
        <!-- Spring MVC -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>

        <!-- Other Web dependencies -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
            <version>${jstl.version}</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>${servlet.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet.jsp</groupId>
            <artifactId>jsp-api</artifactId>
            <version>${jsp.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Spring and Transactions -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>

        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>jsr250-api</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>

    </dependencies>
</project>

Spring配置

使用JMS连接工厂通过消息代理发送消息,因为选择了ActiveMQ作为消息代理,因此需要配置JMS连接工厂,让它知道如何连接到ActiveMQ。

ActiveMQConnectionFactory

是ActiveMQ自带的连接工厂,可以在Spring中进行配置。

使用

JmsTemplate

可以创建连接、获得会话以及发送和接收消息。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- Activemq 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg value="system1" />
        <constructor-arg value="manager1" />
        <constructor-arg value="failover:(tcp://localhost:61616)?timeout=2000" />
    </bean>

    <!-- ConnectionFactory Definition -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="activeMQConnectionFactory" />
    </bean>

    <!-- Default Destination Queue Definition -->
    <!-- 测试配置多个Destination -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="test.activemq.queue" />
    </bean>

    <!-- JmsTemplate Definition -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="destination" />
    </bean>

    <!-- Message Sender Definition -->
    <bean id="messageSender" class="activemq.publisher.MessageSender">
        <constructor-arg index="0" ref="jmsTemplate" />
        <constructor-arg index="1" ref="destination" />
    </bean>

    <!-- 消息监听器,主要监听的目的地址 Message Receiver Definition -->
    <bean id="messageReceiver" class="activemq.consumer.MessageReceiver">
    </bean>
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destinationName" value="test.activemq.queue" />
        <property name="messageListener" ref="messageReceiver" />
    </bean>

</beans>

发送消息

当调用

JmsTemplate



send()

方法,

JmsTemplate

将负责JMS连接、会话并代表发送者发送消息。这里使用了默认的消息目的地。

JmsTemplate发送消息

package activemq.publisher;

import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;

public class MessageSender {

    private final JmsTemplate jmsTemplate;
    private final Destination destination;

    public MessageSender(final JmsTemplate jmsTemplate, final Destination destination) {
        this.jmsTemplate = jmsTemplate;
        this.destination = destination;
    }

    public void send(final String text) {
        try {
            jmsTemplate.setDefaultDestination(destination);
            jmsTemplate.convertAndSend(text);
            System.out.println("发送消息 : " + text);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

接收消息监听器

package activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MessageReceiver implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println("接收到消息: " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}

测试

使用Spring的定时任务定时发送消息。

定时任务配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">

    <context:annotation-config /> 
    <bean id="QuartzFactoryBean"
        class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="triggers">
            <list>
                <ref bean="capacityDataPublisherJobTrigger" />
            </list>
        </property>
    </bean>

    <bean id="capacityDataPublisherJob" class="activemq.TestSenderService"
    init-method="run">
    </bean>

    <bean id="capacityDataPublisherJobTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
        <property name="jobDetail" ref="capacityDataPublisherJobDetail" />
        <property name="cronExpression">
            <value>0 0/5 * * * ?</value>
        </property>
    </bean>
    <bean id="capacityDataPublisherJobDetail"
        class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
        <property name="targetObject" ref="capacityDataPublisherJob" />
        <property name="targetMethod" value="run" />
        <property name="concurrent" value="false" />
    </bean>

</beans>

定时任务

package activemq;

import org.springframework.beans.factory.annotation.Autowired;
import activemq.publisher.MessageSender;

public class TestSenderService {

    @Autowired
    private MessageSender messageSender;

    public void run() {
        messageSender.send("message");
    }

}

测试结果

在tomcat中运行项目。

运行后发送了两条消息,消息队列中显示:

此处输入图片的描述

重启项目时,接收消息监听器会处理队列中所有的消息,项目运行时,每次发送消息成功后都会触发接收消息监听器:

此处输入图片的描述

入列和出列:

此处输入图片的描述


代码获取地址:


https://github.com/hoxis/JavaWeb/tree/master/activemq


如果觉得有用,欢迎关注我的微信,

海量学习资源免费送!

你的关注是对我最大的鼓励!



版权声明:本文为bruce_6原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。