关于Spring整合IBM MQ实战案例

  • Post author:
  • Post category:其他



本以为之前对MQ的学习就要结束了,结果发现这才刚刚开始,送给刚入职场的每一位新人和我。


今天主要分享一下自己对Spring整合IBM MQ的一些学习心得。再次感谢一个csdn博主,名字没记住,头像是一个戴墨镜的皮卡丘
大笑


需求:定时查询数据库中的表,将新增数据查询出来并放入mq队列中。


1.模拟表字段:






2.引入ibm mq依赖


由于ibm mq依赖的jar包不在maven的中央仓库,如果要在pom.xml中依赖,需要先将jar安装进本地仓库或者私服


打开cmd,进入到mq的../java/lib下,执行下列命令

    mvn install:install-file -Dfile=com.ibm.mq.defaultconfig.jar -DgroupId=com.ibm.mq -DartifactId=defaultconfig -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.commonservices.jar -DgroupId=com.ibm.mq -DartifactId=commonservices -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mqjms.jar -DgroupId=com.ibm -DartifactId=mqjms -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=jms.jar -DgroupId=com.ibm.mq -DartifactId=jms -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.headers.jar -DgroupId=com.ibm.mq -DartifactId=headers -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.jar -DgroupId=com.ibm -DartifactId=mq -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.jmqi.jar -DgroupId=com.ibm.mq -DartifactId=jmqi -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.pcf.jar -DgroupId=com.ibm.mq -DartifactId=pcf -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.postcard.jar -DgroupId=com.ibm.mq -DartifactId=postcard -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.soap.jar -DgroupId=com.ibm.mq -DartifactId=soap -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=com.ibm.mq.tools.ras.jar -DgroupId=com.ibm.mq.tools -DartifactId=ras -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=jms.jar -DgroupId=com.ibm.mq -DartifactId=jms -Dversion=7.5.0.1 -Dpackaging=jar  
      
    mvn install:install-file -Dfile=dhbcore.jar -DgroupId=com.ibm.mq -DartifactId=dhbcore -Dversion=7.5.0.1 -Dpackaging=jar  


提示:-Dfile=“引入的文件名”      -DgroupId=“maven中添加的组名”       -DartifactId=“maven中添加的生成产物ID”


3.以下命令是本地仓库安装相应的jar包


安装完成后再pom.xml中引用依赖

    <dependency>  
                <groupId>com.ibm.mq</groupId>  
                <artifactId>jmqi</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.ibm.mq</groupId>  
                <artifactId>headers</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.ibm</groupId>  
                <artifactId>mq</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.ibm.mq</groupId>  
                <artifactId>jms</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.ibm.mq</groupId>  
                <artifactId>dhbcore</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.ibm</groupId>  
                <artifactId>mqjms</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>com.ibm.mq</groupId>  
                <artifactId>connector</artifactId>  
                <version>${ibm.mq.version}</version>  
            </dependency>  
            <dependency>  
                <groupId>org.springframework</groupId>  
                <artifactId>spring-jms</artifactId>  
                <version>${org.springframework.version}</version>  
            </dependency>  



4.配置applicationContext-ibmmq.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     
    xmlns:context="http://www.springframework.org/schema/context"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    				    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
        				http://www.springframework.org/schema/context 
        				http://www.springframework.org/schema/context/spring-context-3.1.xsd">  
    <description>MQ</description>
   <!-- 加载mq属性配置文件 --> 
    <context:property-placeholder location="classpath:properties/mqconnect.properties"/>
    <!-- mq队列连接工厂 -->
    <bean id="jmsConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
    	<property name="hostName" value="${queue.manager.host}"/>
    	<property name="port" value="${app.mq.port}"/>
    	<property name="CCSID" value="${app.mq.ccsid}"/>
    	<property name="queueManager" value="${queue.manager}"/>
    	<property name="channel" value="${app.mq.channel}"/>
    	<property name="transportType" value="${app.mq.transportType}"/>
    </bean>
    <!-- spring缓存管理mq队列连接工厂 -->
    <bean id="jmsQueueConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    	<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
    	<property name="sessionCacheSize" value="10"/>
    </bean>
    
    <!-- MQ sender queue setup -->
    <bean id="senderQueue" class="com.ibm.mq.jms.MQQueue">
    	<property name="baseQueueManagerName" value="${queue.manager}"/>
    	<property name="baseQueueName" value="${queue.name}"/>
    </bean>
    <!-- the JMSTemplate setup for Sender Queue -->
    <bean id="SenderJMSTemplate" class="org.springframework.jms.core.JmsTemplate">
    	<property name="connectionFactory" ref="jmsQueueConnectionFactory"/>  
        <property name="defaultDestination" ref="senderQueue"/>  
        <property name="pubSubDomain" value="false"/>
    </bean>
    <!-- 队列发送程序 -->  
    <bean id="jmsSender" class="com.yusys.jms.JMSSender">    
        <property name="jmsTemplate" ref="SenderJMSTemplate" />    
        <property name="replyTo" ref="senderQueue" />  
    </bean>  
    
    
    
    <!-- MQ receiver queue setup -->
    <bean id="receiverQueue" class="com.ibm.mq.jms.MQQueue">
    	<property name="baseQueueManagerName" value="${queue.manager}"/>   
        <property name="baseQueueName" value="${queue.name}"/>   
    </bean>
    <!-- the JMSTemplate setup for Receiver Queue -->    
    <bean id="ReceiverJMSTemplate" class="org.springframework.jms.core.JmsTemplate">    
        <property name="connectionFactory" ref="jmsQueueConnectionFactory"/>  
        <property name="defaultDestination" ref="receiverQueue"/>   
        <property name="pubSubDomain" value="false"/>   
    </bean>   
    <!-- 队列接收程序 -->  
    <bean id="jmsReceiver" class="com.yusys.jms.JMSReceiver">    
        <property name="jmsTemplate" ref="ReceiverJMSTemplate"/>  
        <property name="receiveQueue" ref="receiverQueue"/>  
    </bean>   
      
      
    <!-- spring 监听队列,一旦队列中有消息,马上触发监听 -->  
    <bean id="jmsContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
        <property name="connectionFactory" ref="jmsQueueConnectionFactory" />    
        <property name="destination" ref="receiverQueue" />
        <property name="pubSubDomain" value="false" />    
        <property name="messageListener" ref="jmsReceiver" />    
        <property name="concurrentConsumers" value="20" />   
    </bean> 
</beans>
        
        



5.mqconnect.properties mq属性配置文件

#localhost
queue.manager.host=127.0.0.1
queue.manager=bajie
queue.name=BAJIE_QUEUE
app.mq.port=1415
app.mq.channel=bajie_conn
app.mq.transportType=1
app.mq.ccsid=1381



6.消息发送程序JMSSender.java

package com.yusys.jms;

import java.util.Properties;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;

import com.alibaba.fastjson.JSONObject;

public class JMSSender {
	private JmsTemplate jmsTemplate;
	private Properties msgHeader;
	private Destination replyTo;
	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	public Properties getMsgHeader() {
		return msgHeader;
	}
	public void setMsgHeader(Properties msgHeader) {
		this.msgHeader = msgHeader;
	}
	public Destination getReplyTo() {
		return replyTo;
	}
	public void setReplyTo(Destination replyTo) {
		this.replyTo = replyTo;
	}
	
	public void sendMessage(Object object){
		JSONObject json = new JSONObject();
		json.put("orderData", object);
		jmsTemplate.convertAndSend(replyTo,json.toJSONString());
	}
}



7.消息接收程序JMSReceiver.java。请注意此类实现了MessageListener接口,覆盖onMessage方法,这样就可以在spring的监听容器中配置本来为监听程序,一旦监听的队列中有消息,就会触发本类中的onMessage方法。

package com.yusys.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;

import org.springframework.jms.core.JmsTemplate;

import com.ibm.mq.jms.MQQueue;
import com.yusys.pojo.Users;

public class JMSReceiver implements MessageListener {

	private JmsTemplate jmsTemplate;
	
	private MQQueue receiveQueue;
	
	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	public MQQueue getReceiveQueue() {
		return receiveQueue;
	}
	public void setReceiveQueue(MQQueue receiveQueue) {
		this.receiveQueue = receiveQueue;
	}
	@Override
	public void onMessage(Message message) {
		try {  
            System.out.println("reviced msg is:" +(new java.util.Date())+":"+ ((TextMessage)message).getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  

	}
	
	public void processMessage(){
		Message msg = jmsTemplate.receive(receiveQueue.getBaseQueueName());  
        try{  
            if (msg instanceof ObjectMessage) {  
                ObjectMessage objMsg = (ObjectMessage) msg;  
                try {  
                    Users user = (Users) objMsg.getObject();  
                    System.out.println("用户名:" + user.getUsername() + ",性别:"  
                            + user.getSex()+",电话:"+user.getTelephone());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }             
        }catch(Exception e){  
                e.printStackTrace();  
        }  
          
	}

}



8.单元测试程序:这里要说明一下,如果你要看到的是jmsReceiver控制台打印出jmsSender发送的数据,就必须把两个方法分开写在两个类中,一个类显示不出来,也可能是我的打开方式不对…
大哭

    package com.junit;  
    import org.junit.Test;  
    import org.springframework.context.ApplicationContext;  
    import org.springframework.context.support.ClassPathXmlApplicationContext;  
      
    import com.watson.jms.JMSReceiver;  
    import com.watson.jms.JMSSender;  
    import com.watson.jms.Person;  
      
    public class JMSSenderTest {  
          
        @Test  
        public void jmsSender() {  
            ApplicationContext app = new ClassPathXmlApplicationContext("applicationContext-ibmmq.xml");  
            JMSSender sender = (JMSSender)app.getBean("jmsSender");  
            Person person = new Person("watson",20);  
            sender.sendMesage(person);  
        }  
          
          
        public void jmsReceiver() {  
            ApplicationContext app = new ClassPathXmlApplicationContext("applicationContext-ibmmq.xml");  
            JMSReceiver receiver = (JMSReceiver)app.getBean("jmsReceiver");  
            receiver.processMessage();  
        }  
      
    }  



9.这里最后再说一下定时器,我用的是Quartz框架,使用还是比较简单

    <dependencies>  
        <dependency>  
            <groupId>org.quartz-scheduler</groupId>  
            <artifactId>quartz</artifactId>  
            <version>2.2.2</version>  
        </dependency>  
    </dependencies>  



10.简单的说一下Quartz的配置文件

<!--  配置job类  -->
	<bean id="scheduler" class="com.yusys.controller.msgController"/> 
	<!-- 配置JobDetail -->
	<bean id="springQtzJobMethod" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
		<property name="targetObject">
			<ref bean="scheduler"/>
		</property>
		<property name="targetMethod">
			<value>execute</value>
		</property>
	</bean>
	<!-- 配置trigger -->
	<bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
		<property name="jobDetail" ref="springQtzJobMethod"></property>
		<property name="cronExpression" value="0/5 * * * * ?"></property>
	</bean>
	<!-- 配置调度工厂 -->
	<bean id="SpringJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<list>
				<ref bean="cronTriggerFactoryBean"/>
			</list>
		</property>
	</bean> 
	



11.controller层代码

package com.yusys.controller;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;

import com.alibaba.fastjson.JSONObject;
import com.yusys.jms.JMSReceiver;
import com.yusys.jms.JMSSender;
import com.yusys.pojo.Msg;
import com.yusys.service.msgService;

@Controller
public class msgController {
	@Autowired
	private msgService msgservice;
	@Autowired
	private JMSReceiver jmsReceiver;
	@Autowired
	private JMSSender jmsSender;
	
	public void execute(){
		List<Msg> list = msgservice.getMsgByStatus("未发送");
		if (list!=null && list.size()>0) {
			System.out.println(list.toString());
			jmsSender.sendMessage(list);
		}else{
			System.out.println("未新增数据...");
		}
	}
}



12.service层代码(只写实现类,不要问我注入的是什么….肯定接口啦)

package com.yusys.service.impl;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.yusys.mapper.MsgMapper;
import com.yusys.pojo.Msg;
import com.yusys.pojo.MsgExample;
import com.yusys.pojo.MsgExample.Criteria;
import com.yusys.service.msgService;
@Service
public class msgServiceImpl implements msgService {

	@Autowired
	private MsgMapper msgMapper;
	
	@Override
	public List<Msg> getMsgByStatus(String status) {
		MsgExample example = new MsgExample();
		Criteria criteria = example.createCriteria();
		criteria.andStatusEqualTo(status);
		List<Msg> list = msgMapper.selectByExample(example);
		List<Msg> list2 = null;
		if (list != null && list.size() > 0) {
			list2 = list;
			for (int i = 0; i < list.size(); i++) {
				Msg msg = list.get(i);
				msg.setStatus("已发送");
				msgMapper.updateByPrimaryKey(msg);
			}
		}
		return list2;
	}

}



13.dao层用的逆向工程,当然也可以自己手写,都一样。




看到这里,基本上也差不过了该结束了,还有好多想说的都不知道该怎么表达…………………….


个人理解奉上:


1.mq中的send和rece方法是针对同一个队列进行的操作。


2.rece中的监听方法,会从mq队列中取出数据,所有如果你要存数据到mq队列中,要在配置文件中注销监听器方法。



版权声明:本文为wanggangabc111原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。