本以为之前对MQ的学习就要结束了,结果发现这才刚刚开始,送给刚入职场的每一位新人和我。
今天主要分享一下自己对Spring整合IBM MQ的一些学习心得。再次感谢一个csdn博主,名字没记住,头像是一个戴墨镜的皮卡丘
。
需求:定时查询数据库中的表,将新增数据查询出来并放入mq队列中。
1.模拟表字段:
2.引入ibm mq依赖
由于ibm mq依赖的jar包不在maven的中央仓库,如果要在pom.xml中依赖,需要先将jar安装进本地仓库或者私服
打开cmd,进入到mq的../java/lib下,执行下列命令
mvn install:install-file -Dfile=com.ibm.mq.defaultconfig.jar -DgroupId=com.ibm.mq -DartifactId=defaultconfig -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.commonservices.jar -DgroupId=com.ibm.mq -DartifactId=commonservices -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mqjms.jar -DgroupId=com.ibm -DartifactId=mqjms -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=jms.jar -DgroupId=com.ibm.mq -DartifactId=jms -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.headers.jar -DgroupId=com.ibm.mq -DartifactId=headers -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.jar -DgroupId=com.ibm -DartifactId=mq -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.jmqi.jar -DgroupId=com.ibm.mq -DartifactId=jmqi -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.pcf.jar -DgroupId=com.ibm.mq -DartifactId=pcf -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.postcard.jar -DgroupId=com.ibm.mq -DartifactId=postcard -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.soap.jar -DgroupId=com.ibm.mq -DartifactId=soap -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=com.ibm.mq.tools.ras.jar -DgroupId=com.ibm.mq.tools -DartifactId=ras -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=jms.jar -DgroupId=com.ibm.mq -DartifactId=jms -Dversion=7.5.0.1 -Dpackaging=jar
mvn install:install-file -Dfile=dhbcore.jar -DgroupId=com.ibm.mq -DartifactId=dhbcore -Dversion=7.5.0.1 -Dpackaging=jar
提示:-Dfile=“引入的文件名” -DgroupId=“maven中添加的组名” -DartifactId=“maven中添加的生成产物ID”
3.以下命令是本地仓库安装相应的jar包
安装完成后再pom.xml中引用依赖
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>jmqi</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>headers</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>mq</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>jms</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>dhbcore</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>mqjms</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>connector</artifactId>
<version>${ibm.mq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
4.配置applicationContext-ibmmq.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<description>MQ</description>
<!-- 加载mq属性配置文件 -->
<context:property-placeholder location="classpath:properties/mqconnect.properties"/>
<!-- mq队列连接工厂 -->
<bean id="jmsConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName" value="${queue.manager.host}"/>
<property name="port" value="${app.mq.port}"/>
<property name="CCSID" value="${app.mq.ccsid}"/>
<property name="queueManager" value="${queue.manager}"/>
<property name="channel" value="${app.mq.channel}"/>
<property name="transportType" value="${app.mq.transportType}"/>
</bean>
<!-- spring缓存管理mq队列连接工厂 -->
<bean id="jmsQueueConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
<property name="sessionCacheSize" value="10"/>
</bean>
<!-- MQ sender queue setup -->
<bean id="senderQueue" class="com.ibm.mq.jms.MQQueue">
<property name="baseQueueManagerName" value="${queue.manager}"/>
<property name="baseQueueName" value="${queue.name}"/>
</bean>
<!-- the JMSTemplate setup for Sender Queue -->
<bean id="SenderJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
<property name="defaultDestination" ref="senderQueue"/>
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 队列发送程序 -->
<bean id="jmsSender" class="com.yusys.jms.JMSSender">
<property name="jmsTemplate" ref="SenderJMSTemplate" />
<property name="replyTo" ref="senderQueue" />
</bean>
<!-- MQ receiver queue setup -->
<bean id="receiverQueue" class="com.ibm.mq.jms.MQQueue">
<property name="baseQueueManagerName" value="${queue.manager}"/>
<property name="baseQueueName" value="${queue.name}"/>
</bean>
<!-- the JMSTemplate setup for Receiver Queue -->
<bean id="ReceiverJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
<property name="defaultDestination" ref="receiverQueue"/>
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 队列接收程序 -->
<bean id="jmsReceiver" class="com.yusys.jms.JMSReceiver">
<property name="jmsTemplate" ref="ReceiverJMSTemplate"/>
<property name="receiveQueue" ref="receiverQueue"/>
</bean>
<!-- spring 监听队列,一旦队列中有消息,马上触发监听 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsQueueConnectionFactory" />
<property name="destination" ref="receiverQueue" />
<property name="pubSubDomain" value="false" />
<property name="messageListener" ref="jmsReceiver" />
<property name="concurrentConsumers" value="20" />
</bean>
</beans>
5.mqconnect.properties mq属性配置文件
#localhost
queue.manager.host=127.0.0.1
queue.manager=bajie
queue.name=BAJIE_QUEUE
app.mq.port=1415
app.mq.channel=bajie_conn
app.mq.transportType=1
app.mq.ccsid=1381
6.消息发送程序JMSSender.java
package com.yusys.jms;
import java.util.Properties;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
import com.alibaba.fastjson.JSONObject;
public class JMSSender {
private JmsTemplate jmsTemplate;
private Properties msgHeader;
private Destination replyTo;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Properties getMsgHeader() {
return msgHeader;
}
public void setMsgHeader(Properties msgHeader) {
this.msgHeader = msgHeader;
}
public Destination getReplyTo() {
return replyTo;
}
public void setReplyTo(Destination replyTo) {
this.replyTo = replyTo;
}
public void sendMessage(Object object){
JSONObject json = new JSONObject();
json.put("orderData", object);
jmsTemplate.convertAndSend(replyTo,json.toJSONString());
}
}
7.消息接收程序JMSReceiver.java。请注意此类实现了MessageListener接口,覆盖onMessage方法,这样就可以在spring的监听容器中配置本来为监听程序,一旦监听的队列中有消息,就会触发本类中的onMessage方法。
package com.yusys.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
import com.ibm.mq.jms.MQQueue;
import com.yusys.pojo.Users;
public class JMSReceiver implements MessageListener {
private JmsTemplate jmsTemplate;
private MQQueue receiveQueue;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public MQQueue getReceiveQueue() {
return receiveQueue;
}
public void setReceiveQueue(MQQueue receiveQueue) {
this.receiveQueue = receiveQueue;
}
@Override
public void onMessage(Message message) {
try {
System.out.println("reviced msg is:" +(new java.util.Date())+":"+ ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
public void processMessage(){
Message msg = jmsTemplate.receive(receiveQueue.getBaseQueueName());
try{
if (msg instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) msg;
try {
Users user = (Users) objMsg.getObject();
System.out.println("用户名:" + user.getUsername() + ",性别:"
+ user.getSex()+",电话:"+user.getTelephone());
} catch (JMSException e) {
e.printStackTrace();
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
8.单元测试程序:这里要说明一下,如果你要看到的是jmsReceiver控制台打印出jmsSender发送的数据,就必须把两个方法分开写在两个类中,一个类显示不出来,也可能是我的打开方式不对…
package com.junit;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.watson.jms.JMSReceiver;
import com.watson.jms.JMSSender;
import com.watson.jms.Person;
public class JMSSenderTest {
@Test
public void jmsSender() {
ApplicationContext app = new ClassPathXmlApplicationContext("applicationContext-ibmmq.xml");
JMSSender sender = (JMSSender)app.getBean("jmsSender");
Person person = new Person("watson",20);
sender.sendMesage(person);
}
public void jmsReceiver() {
ApplicationContext app = new ClassPathXmlApplicationContext("applicationContext-ibmmq.xml");
JMSReceiver receiver = (JMSReceiver)app.getBean("jmsReceiver");
receiver.processMessage();
}
}
9.这里最后再说一下定时器,我用的是Quartz框架,使用还是比较简单
<dependencies>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
10.简单的说一下Quartz的配置文件
<!-- 配置job类 -->
<bean id="scheduler" class="com.yusys.controller.msgController"/>
<!-- 配置JobDetail -->
<bean id="springQtzJobMethod" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject">
<ref bean="scheduler"/>
</property>
<property name="targetMethod">
<value>execute</value>
</property>
</bean>
<!-- 配置trigger -->
<bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="springQtzJobMethod"></property>
<property name="cronExpression" value="0/5 * * * * ?"></property>
</bean>
<!-- 配置调度工厂 -->
<bean id="SpringJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTriggerFactoryBean"/>
</list>
</property>
</bean>
11.controller层代码
package com.yusys.controller;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import com.alibaba.fastjson.JSONObject;
import com.yusys.jms.JMSReceiver;
import com.yusys.jms.JMSSender;
import com.yusys.pojo.Msg;
import com.yusys.service.msgService;
@Controller
public class msgController {
@Autowired
private msgService msgservice;
@Autowired
private JMSReceiver jmsReceiver;
@Autowired
private JMSSender jmsSender;
public void execute(){
List<Msg> list = msgservice.getMsgByStatus("未发送");
if (list!=null && list.size()>0) {
System.out.println(list.toString());
jmsSender.sendMessage(list);
}else{
System.out.println("未新增数据...");
}
}
}
12.service层代码(只写实现类,不要问我注入的是什么….肯定接口啦)
package com.yusys.service.impl;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.yusys.mapper.MsgMapper;
import com.yusys.pojo.Msg;
import com.yusys.pojo.MsgExample;
import com.yusys.pojo.MsgExample.Criteria;
import com.yusys.service.msgService;
@Service
public class msgServiceImpl implements msgService {
@Autowired
private MsgMapper msgMapper;
@Override
public List<Msg> getMsgByStatus(String status) {
MsgExample example = new MsgExample();
Criteria criteria = example.createCriteria();
criteria.andStatusEqualTo(status);
List<Msg> list = msgMapper.selectByExample(example);
List<Msg> list2 = null;
if (list != null && list.size() > 0) {
list2 = list;
for (int i = 0; i < list.size(); i++) {
Msg msg = list.get(i);
msg.setStatus("已发送");
msgMapper.updateByPrimaryKey(msg);
}
}
return list2;
}
}
13.dao层用的逆向工程,当然也可以自己手写,都一样。
看到这里,基本上也差不过了该结束了,还有好多想说的都不知道该怎么表达…………………….
个人理解奉上:
1.mq中的send和rece方法是针对同一个队列进行的操作。
2.rece中的监听方法,会从mq队列中取出数据,所有如果你要存数据到mq队列中,要在配置文件中注销监听器方法。