前言
上一篇介绍了WebSphere MQ的安装过程,如果你现在还没有安装WebSphere MQ,不妨看一看安装教程:☞
IBMMQ安装
在常见的互联网消息中间件中,WebSphere MQ常见与银行业务使用,IBM 消息中间件MQ以其独特的安全机制、简便快速的编程风格、卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通讯能力,成为业界市场占有率最高的消息中间件产品。
WebSphere MQ的基本概念
掌握和理解WebSphere MQ概念,可以帮助我们快速进行开发其结构图如图所示:
-
消息 (Message)
消息是WebSphere MQ 中最小的概念,它是客户端输入数据的一个载体,包含
数据消息头
和
数据消息体
。消息数据头是对消息属性的描述,这段信息往往被队列管理器用来确定对消息的处理。
消息数据头
可以由应用程序或系统的消息服务程序共同产生,它包含了消息在 传送中的必要信息,如目标队列管理器的名字,目标队列的名字,以及消息的其它一些属性。
消息数据体
就是应用传递信息的载体可以是二进制数据也可以是字符串,具体使用可以按照实际需求进行选择,消息可以分成持久 (Persistent) 消息和非持久 (Non-Persistent) 消息。所谓 “持久”的意思,就是在 WebSphere MQ 队列管理器重启动后,消息是否仍然能保持。 -
队列 (Queue)
我们可以简单地把队列看成一个容器,用于存放消息的载体。队列按其定义可分成本地队列、远程队列、别名队列、模型队列。其中需要说明的是本地队列是使用较多的,一般是我们使用java调用api的那个队列就是本地队列,本地队列存储消息都存放在本地存储空间,而远程队列是目标队列在本地的一个引用。 -
队列管理器
队列管理器是为应用程序提供消息传递服务的程序,队列管理器构建了独立的 MQ 的运行环境,它是消息队列的管理者,用来维护和管理消息队列。一台机器可以创建多个队列管理器的实例,但是需要注意的是队列管理器的名称是唯一的且一个队列只能属于一个队列管理器。 -
通道 (Channel)
通道是两个队列管理器之间的一种单向的点对点的通信连接,通道是两个队列管理器之间的一种单向的点对点的通信连接,消息在通道中只能单向流 动。如果需要双向交流,可以建立一对通道,一来一去。站在队列管理器的角度,这一对通道可以按消息的流向分成输入通道和输出通道。通过配置,对于放入本地传输队列中的消息,队列管理器会自动将其通过输出通道发出,送入对方的远程目标队列。
下面将演示队列管理器、队列、通道等相关初始化配置,如下图所示:
window下的IBMMQ使用和配置
- 队列管理器配置
-
在“导航器”视图中,右键单击队列管理器文件夹,然后单击新建 > 队列管理器。 此时会打开“创建队列管理器”向导。
-
在队列管理器名称字段中,输入 Queue_Manager。
-
设置队列日志记录, 可以设置MQ存储的文件目录,也可以使用默认缺省路径。
-
勾选创建队列管理器启动队列管理器以及创建服务器连接通道
- 确保选中创建配置用于 TCP/IP 的侦听器复选框,默认监听的端口是1414,可以根据实际情况进行修改。
6. 配置MQ断开连接自动重连的时间。
- 创建队列
-
打开队列管理器选中队列->新建->本地队列
-
输入队列名称点击完成
- 创建连接通道
-
连接通道在队列管理器下面的高级,选择服务器连接通道如下图所示:
-
输入连接通道名称
-
点击完成
-
WebSphere MQ的权限配置
上面操作有可能会提示下面的错误,如下图所示:
此时就需要我们分配一下当前window系统用户的组权限问题具体步骤如下图所示:
-
打开系统开始-> 输入mmc
-
选择文件->添加/删除管理单元选项
-
添加本地 用户和组
-
查看WebSphere的组名,右键属性可以看到其属于
mqm
组
5. 当前登陆的用户是codegeekgao,右键属性为其添加组mqm应用并确定后,重启计算机以使配置生效。
Linux下的IBMMQ使用和配置
- 队列管理器相关命令
crtmqm -q QM_TEST 建立默认队列管理器QM_TEST
strmqm QM_TEST 启动队列管理器QM_TEST
runmqsc QM_TEST 运行队列管理器QM_TEST
endmqm QM_TEST 停止队列管理器QM_TEST
dltmqm QM_TEST 删除队列管理器QM_TEST
- 创建QM_TEST的队列管理器队列
建立默认队列管理器QM_TEST
[mqm@hadoop1 bin]$ crtmqm -q QM_TEST
WebSphere MQ queue manager created.
Directory '/var/mqm/qmgrs/QM_TEST' created.
The queue manager is associated with installation 'Installation1'.
Creating or replacing default objects for queue manager 'QM_TEST'.
Default objects statistics : 79 created. 0 replaced. 0 failed.
Completing setup.
Setup completed.
- 查看队列状态
// Ended未启用
[mqm@hadoop1 bin]$ dspmq
QMNAME(QM_TEST) STATUS(Ended immediately)
- 启动队列管理器
[mqm@hadoop1 bin]$ strmqm QM_TEST
WebSphere MQ queue manager 'QM_TEST' starting.
The queue manager is associated with installation 'Installation1'.
5 log records accessed on queue manager 'QM_TEST' during the log replay phase.
Log replay for queue manager 'QM_TEST' complete.
Transaction manager state recovered for queue manager 'QM_TEST'.
WebSphere MQ queue manager 'QM_TEST' started using V8.0.0.4.
- 运行队列管理器
[mqm@hadoop1 bin]$ runmqsc QM_TEST
5724-H72 (C) Copyright IBM Corp. 1994, 2015.
Starting MQSC for queue manager QM_TEST.
- 定义本地队列
ql:定义的本地队列名 maxdepth:消息最大接收,defpsist 队列重启消息不丢
def ql(QUEUE_RES) maxdepth(10000) defpsist(yes) replace
1 : def ql(QUEUE_RES) maxdepth(10000) defpsist(yes) replace
AMQ8006: WebSphere MQ queue created.
- 定义服务器连接通道
// define channel(CH1) chltype(SVRCONN) trptype(TCP) mcauser(‘mqm’)
DEFINE CHANNEL(CONN_CHANNEL) CHLTYPE(SVRCONN) TRPTYPE(TCP)
1 : DEFINE CHANNEL(CONN_CHANNEL) CHLTYPE(SVRCONN) TRPTYPE(TCP)
AMQ8014: WebSphere MQ channel created.
// 开启通道
start chl(CONN_CHANNEL)
1 : start chl(CONN_CHANNEL)
AMQ8018: Start WebSphere MQ channel accepted.
输入end退出队列管理器控制台。
- 测试发送和接收消息
发送消息
// amqsput queue_name queue_manager_name 发送消息
[mqm@hadoop1 bin]$ amqsput QUEUE_RES QM_TEST
Sample AMQSPUT0 start
target queue is QUEUE_RES
// 输入字符串后,敲回车键结束文字输入
hello ibmmq
Sample AMQSPUT0 end
接收消息
// amqsput queue_name queue_manager_name 发送消息
[mqm@hadoop1 bin]$ amqsget QUEUE_RES QM_TEST
Sample AMQSGET0 start
message <hello ibmmq>
- 启动端口监听(即ibmmq设置端口)
[mqm@hadoop1 bin]$ runmqlsr -m QM_TEST -p 1414 -t tcp &
[1] 2167
[mqm@hadoop1 bin]$ 5724-H72 (C) Copyright IBM Corp. 1994, 2015.
// 查看1414端口是否启用
[mqm@hadoop1 bin]$ netstat -nltp | grep 1414
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::1414 :::* LISTEN 2167/runmqlsr
- 其他设置修改(需要运行队列管理器窗口)
查看队列管理器字符集:
dis qmgr
修改字符集:
alter qmgr CCSID(字符集号)
显示队列管理器中的所有队列:
dis q(*)
查询所有通道
display chl(*)
java代码进行消息的生产和消费
导入MQ安装目录下的依赖包,具体代码如下面所示:
import com.ibm.mq.*;
import org.junit.Before;
import org.junit.Test;
public class IBMMq {
// 队列管理器
public static MQQueueManager QMGR;
// 队列
public static MQQueue QUEUE;
// 请求队列即 接收队列、本地队列
static int reqOpenOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
@Test
public void sendMessage() throws Exception {
// 要写入队列的消息
MQMessage msg = new MQMessage();
msg.format = MQC.MQFMT_STRING;
msg.characterSet = 1381;
msg.encoding = 1381;
MQPutMessageOptions gmo = new MQPutMessageOptions();
// 设置消息用不过期
msg.expiry = -1;
msg.writeString("hello ibmmq");
QUEUE.put(msg,gmo);
System.out.println("发送消息成功");
QUEUE.close();
}
@Test
public void receiveMsg() throws Exception{
int currentDepth = QUEUE.getCurrentDepth();
System.out.println("消息长度:"+currentDepth);
MQMessage msg = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
// 本次接收失败,应该继续循环
QUEUE.get(msg, gmo);
// 接收成功后,启动handler处理接收后的消息
String reqMsgStr = msg.readStringOfByteLength(msg.getDataLength());
System.out.println(reqMsgStr);
}
/**
* ibmMq连接初始化,这里连接的要和上面你配置的Mq对象名称一致
*/
@Before
public void initConn() throws Exception {
MQEnvironment.hostname = "10.211.55.11";
MQEnvironment.port = 1414; // TCP/IP port
MQEnvironment.channel = "test_conn";
MQEnvironment.CCSID = 1381;
// 用户名对应的密码
MQEnvironment.userID = "MUSR_MQADMIN";
// 默认密码为123456
MQEnvironment.password = "123456";
QMGR = new MQQueueManager("Queue_Manager");
QUEUE = QMGR.accessQueue("test_receive",
MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT);
// System.out.println(QUEUE);
}
}
一个简单的入门demo,到此为止。
版权声明:本文为javaee_gao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。