消息队列:请求——>消息
1 、加入依赖
<!--activemq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
2、实现工具类
package cn.bdqn.common;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* 消息队列发送消息工具类
* Created by Administrator on 2018/8/5.
*/
@Component
public class MqUtils {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 发送队列模式消息
* @param target
* @param message
*/
public void sendQueueMessage(String target,Object message) {
Queue queue = new ActiveMQQueue(target);//生成队列
jmsMessagingTemplate.convertAndSend(queue,message);//把消息放入队列
}
/**
* 发布主题模式消息
* @param target
* @param message
*/
public void sendTopicMessage(String target,Object message) {
Topic topic = new ActiveMQTopic(target);//生成主题
jmsMessagingTemplate.convertAndSend(topic,message);//把消息放入主题
}
}
package cn.bdqn.config;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
/**
* Created by Administrator on 2018/8/5.
*/
@Configuration
public class MqConfig {
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.packages.trust-all}")
private boolean trustAllPackages;
@Value("${spring.jms.pub-sub-domain}")
private boolean pubsubdomain;
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
activeMQConnectionFactory.setBeanName(usrName);
activeMQConnectionFactory.setPassword(password);
activeMQConnectionFactory.setTrustAllPackages(trustAllPackages);
return activeMQConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate() {
return new JmsMessagingTemplate(activeMQConnectionFactory());
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainer() {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//设置为发布订阅方式, 默认情况下使用的生产消费者方式
bean.setPubSubDomain(pubsubdomain);
bean.setConnectionFactory(activeMQConnectionFactory());
return bean;
}
}
3、配置文件
#mq配置
activemq:
#broker:代理消息的发送和接收
broker-url: tcp://192.168.219.132:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=30000&jms.redeliveryPolicy.maximumRedeliveries=10
user: admin
password: admin
packages:
trust-all: true
jms:
pub-sub-domain: false #true代表发布订阅模式,false代表队列模式
ACK (Acknowledgement),即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
optimizeAcknowledge:可优化的ACK机制 不使用ACK是消费者消费了消息就立马反馈给MQ,使用ACK消费了一定量的消息后在反馈给MQ(批量反馈)
optimizeAcknowledgeTimeout: 可优化的ACK机制的最大超时时间 在规定的时间内没有反馈给MQ,认定消费消息失败
maximumRedeliveries:最大重新投递次数
spring.activemq.packages.trust-all=true//保证序列化对象成功
spring.jms.pub-sub-domain=true //true 只支持topic模式
//false 只支持Queue模式
4 代码实现
4.1controller中接受请求 转为消息并发送
请求信息一般交给service处理 , 在service中封装成message , 封装类要实现序列化implements Serializable
@RestController
@RequestMapping("/msg")
public class TestMessageController {
@Autowired
private MqUtils mqUtils;
@RequestMapping("/send")
public String sendMessage() {
mqUtils.sendQueueMessage("TestQ","队列模式的消息");
mqUtils.sendTopicMessage("TestT","发布订阅模式的消息");
return "success";
}
}
4.2接受消息并处理
注意注解 @JmsListener(destination = “TestQ”)
@JmsListener(destination = "TestQ")
public void receiveQueueMessage2(Object message) {
System.out.println("receiveQueueMessage2:"+message);
}
@JmsListener(destination = "TestT")
public void receiveTopicMessage1(Object message) {
System.out.println("receiveTopicMessage1:"+message);
}
版权声明:本文为qq_42237676原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。