这里介绍使用webspheremq原生java客户端,进行消息的发送和接收。
在webspheremq中创建发送队列管理器和接收队列管理器,及相关队列、监听器。
接收队列管理器及相关队列、监听器创建如下:
#创建接收队列管理器
crtmqm QUEUE_TWO_MANAGER
#启动接收队列管理器
strmqm QUEUE_TWO_MANAGER
#进入接收队列管理器
runmqsc QUEUE_TWO_MANAGER
#定义本地队列
define ql(QUEUE_TWO)
#定义接收通道
define channel(ONE_TWO_CHL) chltype (RCVR) trptype (TCP)
#定义java客户端连接通道
define channel(ONE_TWO_CONN_CHL) CHLTYPE(SVRCONN) mcauser('mqm') #创建服务器连接通道,mcauser为用户认证
#修改监听器端口为9004
alter listener(system.default.listener.tcp) trptype(tcp) port(9004)
#启动监听器
start listener(system.default.listener.tcp)
备注:
#启动监听器还可使用如下命令
echo "start listener(system.default.listener.tcp)" | runmqsc QUEUE_ONE_MANAGER
发送队列管理器及相关队列、监听器创建如下:
#创建发送队列管理器
crtmqm QUEUE_ONE_MANAGER
#启动发送队列管理器
strmqm QUEUE_ONE_MANAGER
#进入发送队列管理器
runmqsc QUEUE_ONE_MANAGER
#定义传输队列
define qlocal(QUEUE_TR) usage (xmitq)
#定义远程队列,即使发送时使用的队列
define qremote(QUEUE_TWO) rname (QUEUE_TWO) rqmname(QUEUE_TWO_MANAGER) xmitq (QUEUE_TR)
#定义传输通道,连接接收队列管理器的监听地址端口为127.0.0.1(9004)
define channel(ONE_TWO_CHL) chltype(sdr) conname('127.0.0.1(9004)') xmitq(QUEUE_TR) trptype(tcp)
#启动传输通道
start channel(ONE_TWO_CHL)
#定义java客户端连接通道
define channel(ONE_TWO_CONN_CHL) CHLTYPE(SVRCONN) mcauser('mqm') #创建服务器连接通道,mcauser为用户认证
#修改监听器端口为9003
alter listener(system.default.listener.tcp) trptype(tcp) port(9003)
#启动监听器
start listener(system.default.listener.tcp)
这里以spring boot,从发送队列管理器发送消息到接收队列管理器为例。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dragon.study</groupId>
<artifactId>spring-boot-websperemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-websperemq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
在application.yaml定义如下:
raw:
mq:
host: localhost
username: mqm
password: root
sendPort: 9003
receivePort: 9004
sendQueueManager: QUEUE_ONE_MANAGER
receiveQueueManager: QUEUE_TWO_MANAGER
connectChannel: ONE_TWO_CONN_CHL
queue: QUEUE_TWO
ccsid: 1208 #utf-8
receiveTimeout: 2000
charset: utf-8
定义配置参数类RawMqParam.java:
package com.dragon.study.springbootwebsperemq.rawMq.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Hashtable;
@Configuration
@ConfigurationProperties(prefix = "raw.mq", ignoreInvalidFields = true)
@Data
public class RawMqParam {
private String host;
private String username;
private String password;
private int sendPort;
private int receivePort;
private String sendQueueManager;
private String receiveQueueManager;
private String connectChannel;
private String queue;
private int ccsid;
private int receiveTimeout;
private String charset;
public Hashtable<String, Object> getSendCfg(){
return buildCfg(sendPort);
}
public Hashtable<String, Object> getReceiveCfg(){
return buildCfg(receivePort);
}
public Hashtable<String, Object> buildCfg(int port){
Hashtable<String, Object> cfg = new Hashtable<>();
cfg.put("hostname",host);
cfg.put("port",port);
cfg.put("userID",username);
cfg.put("password",password);
cfg.put("CCSID",ccsid);
cfg.put("channel",connectChannel);
return cfg;
}
}
mq连接管理类RawMqConfig.java如下:
package com.dragon.study.springbootwebsperemq.rawMq.config;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQSimpleConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class RawMqConfig {
@Resource
private RawMqParam rawMqParam;
//连接管理器,即是连接池,用于复用连接
@Bean
public MQSimpleConnectionManager receiveMqSimpleConnectionManager() {
return buildMqSimpleConnectionManager(rawMqParam.getReceivePort());
}
//连接管理器,即是连接池,用于复用连接
@Bean
public MQSimpleConnectionManager sendMqSimpleConnectionManager() {
return buildMqSimpleConnectionManager(rawMqParam.getSendPort());
}
MQSimpleConnectionManager buildMqSimpleConnectionManager(int port) {
//初始化连接(若全局配置相同,可在此初始化,否则可直接在后续参数中动态指定)
// MQEnvironment.hostname = rawMqParam.getHost();
// MQEnvironment.port = port;
// MQEnvironment.userID = rawMqParam.getUsername();
// MQEnvironment.password = rawMqParam.getPassword();
// MQEnvironment.CCSID = rawMqParam.getCcsid();
// MQEnvironment.channel = rawMqParam.getConnectChannel();
// MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
//创建连接池管理器
MQSimpleConnectionManager cm = new MQSimpleConnectionManager();
cm.setActive(MQSimpleConnectionManager.MODE_ACTIVE); //MODE_ACTIVE为缺省值,表示当前池为缺省连接管理器
cm.setMaxConnections(10); //最大连接池数
cm.setMaxUnusedConnections(5); //最大未使用连接池数
MQEnvironment.setDefaultConnectionManager(cm); //设置缺省连接池
return cm;
}
}
服务类RawMqService.java如下:
package com.dragon.study.springbootwebsperemq.rawMq.service;
import com.dragon.study.springbootwebsperemq.rawMq.config.RawMqParam;
import com.ibm.mq.*;
import com.ibm.msg.client.wmq.compat.base.internal.MQC;
import lombok.SneakyThrows;
import org.apache.tomcat.util.buf.HexUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class RawMqService {
@Resource
private MQSimpleConnectionManager sendMqSimpleConnectionManager;
@Resource
private MQSimpleConnectionManager receiveMqSimpleConnectionManager;
@Resource
private RawMqParam rawMqParam;
//队列管理器
private MQQueueManager qm;
/**
* 发送消息
* @param msg
* @param correlationId 指定correlationId
* @return
*/
@SneakyThrows
public byte[] sendMsg(String msg, byte[] correlationId) {
//定义队列管理器
MQQueueManager qm = null;
//定义队列
MQQueue queue = null;
try {
//初始队列管理器,使用默认配置
// qm = new MQQueueManager(rawMqParam.getSendQueueManager(), sendMqSimpleConnectionManager);
//初始队列管理器,使用动态参数配置
qm = new MQQueueManager(rawMqParam.getSendQueueManager(),rawMqParam.getSendCfg(), sendMqSimpleConnectionManager);
//初始化队列
// MQC.MQOO_FAIL_IF_QUIESCING: 如果队列管理器停顿则取消息失败
// MQC.MQOO_INPUT_AS_Q_DEF:读取后消息移除队列
// MQC.MQOO_BROWSE:读取后消息仍保留在队列
// MQC.MQGMO_WAIT:队列没消息时等待,直到有消息才返回
// MQC.MQOO_INQUIRE:获取队列深度
// MQC.MQOO_INQUIRE:获取队列深度
queue = qm.accessQueue(rawMqParam.getQueue(), MQC.MQOO_OUTPUT | MQC.MQPMO_NEW_MSG_ID | MQC.MQOO_FAIL_IF_QUIESCING);
//初始化发送配置
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID; //每次发送前自动生成唯一消息id
//创建消息
MQMessage m = new MQMessage();
m.format = MQC.MQFMT_STRING; //设置MQMD字段格式
// m.messageId = msgId == null ? MQC.MQMI_NONE : msgId.getBytes(rawMqParam.getCharset());
// m.messageId = MQC.MQMI_NONE;
m.encoding = rawMqParam.getCcsid(); //设置编码与mq服务一致
m.characterSet = rawMqParam.getCcsid(); //设置字符集与mq服务一致
m.correlationId = correlationId;
m.write(msg.getBytes(rawMqParam.getCharset())); //写入消息体
//发送消息
queue.put(m, pmo);
System.out.println("send, hex sendMsgId:"+ HexUtils.toHexString(m.messageId));
return m.messageId;
} catch (Exception e) {
e.printStackTrace();
} finally {
close(queue, qm);
}
return null;
}
/**
* 接收消息
* @param correlationId 根据correlationId取消息
* @param msgId 根据msgId取消息
* @return
*/
public String receiveMsg(byte[] correlationId, byte[] msgId) {
//定义队列管理器
MQQueueManager qm = null;
//定义队列
MQQueue queue = null;
try {
//初始队列管理器
qm = new MQQueueManager(rawMqParam.getReceiveQueueManager(),rawMqParam.getReceiveCfg() ,receiveMqSimpleConnectionManager);
queue = qm.accessQueue(rawMqParam.getQueue(), MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING);
System.out.println("receiveQueue curDepth:"+queue.getCurrentDepth());
MQMessage m = new MQMessage();
m.format = MQC.MQFMT_STRING;
m.encoding = rawMqParam.getCcsid(); //设置编码与mq服务一致
m.characterSet = rawMqParam.getCcsid(); //设置字符集与mq服务一致
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.waitInterval = MQC.MQWI_UNLIMITED;
if (null != correlationId) {
gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID; //使用correlationId匹配响应消息
m.correlationId = correlationId;
}else if(null != msgId) {
gmo.matchOptions = MQC.MQMO_MATCH_MSG_ID; //使用msgId匹配响应消息
m.messageId = msgId;
}
queue.get(m, gmo);
System.out.println("receive, hex sendMsgId:"+ HexUtils.toHexString(m.messageId));
byte[] bu = new byte[m.getMessageLength()];
m.readFully(bu);
String msg = new String(bu);
return msg;
} catch (Exception e) {
e.printStackTrace();
} finally {
close(queue, qm);
}
return null;
}
//关闭队列和队列管理器,归还连接
public void close(MQQueue q, MQQueueManager qm) {
try {
if (q != null && q.isOpen()) {
q.close();
}
if (qm != null) {
qm.disconnect();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
定义实体Stu.java:
package com.dragon.study.springbootwebsperemq.mq.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class Stu implements Serializable {
private String name;
@Override
public String toString() {
return "Stu{" +
"name='" + name + '\'' +
'}';
}
}
启动类SpringBootWebsperemqApplication.java如下:
package com.dragon.study.springbootwebsperemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootWebsperemqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootWebsperemqApplication.class, args);
}
}
发送、接收消息测试如下:
package com.dragon.study.springbootwebsperemq;
import com.dragon.study.springbootwebsperemq.rawMq.service.RawMqService;
import org.apache.tomcat.util.buf.HexUtils;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class RawMqTests {
@Resource
private RawMqService rawMqService;
@Test
public void rawMsgTest() {
byte[] correlationId = "001".getBytes();
//发送消息,返回消息id
byte[] msgId = rawMqService.sendMsg("hello1", correlationId);
//查看消息id,byte数组是以16进制编码的
String msgIdStr = HexUtils.toHexString(msgId);
System.out.println(msgIdStr);
//根据消息id获取消息
// String msg = rawMqService.receiveMsg(null, msgId);
//根据correlationId获取消息
String msg = rawMqService.receiveMsg(correlationId, null);
System.out.println("receive msg:" + msg);
}
}