当前位置: 首页 > 工具软件 > WebSphere MQ > 使用案例 >

java中操作webspheremq

王叶五
2023-12-01

java中操作webspheremq

一、简介

这里介绍使用webspheremq原生java客户端,进行消息的发送和接收。

二、准备

在webspheremq中创建发送队列管理器和接收队列管理器,及相关队列、监听器。

2.1 接收队列管理器

接收队列管理器及相关队列、监听器创建如下:

#创建接收队列管理器
crtmqm QUEUE_TWO_MANAGER
#启动接收队列管理器
strmqm QUEUE_TWO_MANAGER
#进入接收队列管理器
runmqsc QUEUE_TWO_MANAGER
#定义本地队列
define ql(QUEUE_TWO)
#定义接收通道
define channel(ONE_TWO_CHL) chltype (RCVR) trptype (TCP)
#定义java客户端连接通道
define channel(ONE_TWO_CONN_CHL) CHLTYPE(SVRCONN) mcauser('mqm')  #创建服务器连接通道,mcauser为用户认证
#修改监听器端口为9004
alter listener(system.default.listener.tcp) trptype(tcp) port(9004)
#启动监听器
start listener(system.default.listener.tcp)

备注:

#启动监听器还可使用如下命令
echo "start listener(system.default.listener.tcp)" | runmqsc QUEUE_ONE_MANAGER

2.2 发送队列管理器

发送队列管理器及相关队列、监听器创建如下:

#创建发送队列管理器
crtmqm QUEUE_ONE_MANAGER
#启动发送队列管理器
strmqm QUEUE_ONE_MANAGER
#进入发送队列管理器
runmqsc QUEUE_ONE_MANAGER
#定义传输队列
define qlocal(QUEUE_TR) usage (xmitq)
#定义远程队列,即使发送时使用的队列
define qremote(QUEUE_TWO) rname (QUEUE_TWO) rqmname(QUEUE_TWO_MANAGER) xmitq (QUEUE_TR)
#定义传输通道,连接接收队列管理器的监听地址端口为127.0.0.1(9004)
define channel(ONE_TWO_CHL) chltype(sdr) conname('127.0.0.1(9004)') xmitq(QUEUE_TR) trptype(tcp)
#启动传输通道
start channel(ONE_TWO_CHL)
#定义java客户端连接通道
define channel(ONE_TWO_CONN_CHL) CHLTYPE(SVRCONN) mcauser('mqm')  #创建服务器连接通道,mcauser为用户认证
#修改监听器端口为9003
alter listener(system.default.listener.tcp) trptype(tcp) port(9003)
#启动监听器
start listener(system.default.listener.tcp)

三、使用示例

这里以spring boot,从发送队列管理器发送消息到接收队列管理器为例。

3.1 添加maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.dragon.study</groupId>
    <artifactId>spring-boot-websperemq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-websperemq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>com.ibm.mq.allclient</artifactId>
            <version>9.1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3.2 定义mq配置参数文件

在application.yaml定义如下:

raw:
  mq:
    host: localhost
    username: mqm
    password: root
    sendPort: 9003
    receivePort: 9004
    sendQueueManager: QUEUE_ONE_MANAGER
    receiveQueueManager: QUEUE_TWO_MANAGER
    connectChannel: ONE_TWO_CONN_CHL
    queue: QUEUE_TWO
    ccsid: 1208  #utf-8
    receiveTimeout: 2000
    charset: utf-8

定义配置参数类RawMqParam.java:

package com.dragon.study.springbootwebsperemq.rawMq.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Hashtable;

@Configuration
@ConfigurationProperties(prefix = "raw.mq", ignoreInvalidFields = true)
@Data
public class RawMqParam {
    private String host;
    private String username;
    private String password;
    private int sendPort;
    private int receivePort;
    private String sendQueueManager;
    private String receiveQueueManager;
    private String connectChannel;
    private String queue;
    private int ccsid;
    private int receiveTimeout;
    private String charset;

    public Hashtable<String, Object> getSendCfg(){
        return buildCfg(sendPort);
    }

    public Hashtable<String, Object> getReceiveCfg(){
        return buildCfg(receivePort);
    }

    public Hashtable<String, Object> buildCfg(int port){
        Hashtable<String, Object> cfg = new Hashtable<>();
        cfg.put("hostname",host);
        cfg.put("port",port);
        cfg.put("userID",username);
        cfg.put("password",password);
        cfg.put("CCSID",ccsid);
        cfg.put("channel",connectChannel);
        return cfg;
    }
}

3.3 定义mq连接管理类

mq连接管理类RawMqConfig.java如下:

package com.dragon.study.springbootwebsperemq.rawMq.config;

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQSimpleConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
public class RawMqConfig {

    @Resource
    private RawMqParam rawMqParam;

    //连接管理器,即是连接池,用于复用连接
    @Bean
    public MQSimpleConnectionManager receiveMqSimpleConnectionManager() {
        return buildMqSimpleConnectionManager(rawMqParam.getReceivePort());
    }

    //连接管理器,即是连接池,用于复用连接
    @Bean
    public MQSimpleConnectionManager sendMqSimpleConnectionManager() {
        return buildMqSimpleConnectionManager(rawMqParam.getSendPort());
    }

    MQSimpleConnectionManager buildMqSimpleConnectionManager(int port) {
        //初始化连接(若全局配置相同,可在此初始化,否则可直接在后续参数中动态指定)
//        MQEnvironment.hostname = rawMqParam.getHost();
//        MQEnvironment.port = port;
//        MQEnvironment.userID = rawMqParam.getUsername();
//        MQEnvironment.password = rawMqParam.getPassword();
//        MQEnvironment.CCSID = rawMqParam.getCcsid();
//        MQEnvironment.channel = rawMqParam.getConnectChannel();
//        MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);

        //创建连接池管理器
        MQSimpleConnectionManager cm = new MQSimpleConnectionManager();
        cm.setActive(MQSimpleConnectionManager.MODE_ACTIVE); //MODE_ACTIVE为缺省值,表示当前池为缺省连接管理器
        cm.setMaxConnections(10); //最大连接池数
        cm.setMaxUnusedConnections(5); //最大未使用连接池数
        MQEnvironment.setDefaultConnectionManager(cm);  //设置缺省连接池
        return cm;
    }
}

3.4 定义消息发送和接收服务类

服务类RawMqService.java如下:

package com.dragon.study.springbootwebsperemq.rawMq.service;

import com.dragon.study.springbootwebsperemq.rawMq.config.RawMqParam;
import com.ibm.mq.*;
import com.ibm.msg.client.wmq.compat.base.internal.MQC;
import lombok.SneakyThrows;
import org.apache.tomcat.util.buf.HexUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;

@Service
public class RawMqService {
    @Resource
    private MQSimpleConnectionManager sendMqSimpleConnectionManager;
    @Resource
    private MQSimpleConnectionManager receiveMqSimpleConnectionManager;
    @Resource
    private RawMqParam rawMqParam;

    //队列管理器
    private MQQueueManager qm;

    /**
     * 发送消息
     * @param msg
     * @param correlationId 指定correlationId
     * @return
     */
    @SneakyThrows
    public byte[] sendMsg(String msg, byte[] correlationId) {
        //定义队列管理器
        MQQueueManager qm = null;
        //定义队列
        MQQueue queue = null;
        try {
            //初始队列管理器,使用默认配置
//            qm = new MQQueueManager(rawMqParam.getSendQueueManager(), sendMqSimpleConnectionManager);
            //初始队列管理器,使用动态参数配置
            qm = new MQQueueManager(rawMqParam.getSendQueueManager(),rawMqParam.getSendCfg(), sendMqSimpleConnectionManager);
            //初始化队列
//            MQC.MQOO_FAIL_IF_QUIESCING: 如果队列管理器停顿则取消息失败
//            MQC.MQOO_INPUT_AS_Q_DEF:读取后消息移除队列
//            MQC.MQOO_BROWSE:读取后消息仍保留在队列
//            MQC.MQGMO_WAIT:队列没消息时等待,直到有消息才返回
//            MQC.MQOO_INQUIRE:获取队列深度
//            MQC.MQOO_INQUIRE:获取队列深度
            queue = qm.accessQueue(rawMqParam.getQueue(), MQC.MQOO_OUTPUT | MQC.MQPMO_NEW_MSG_ID | MQC.MQOO_FAIL_IF_QUIESCING);
            //初始化发送配置
            MQPutMessageOptions pmo = new MQPutMessageOptions();
            pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID;  //每次发送前自动生成唯一消息id
            //创建消息
            MQMessage m = new MQMessage();
            m.format = MQC.MQFMT_STRING; //设置MQMD字段格式
//            m.messageId = msgId == null ? MQC.MQMI_NONE : msgId.getBytes(rawMqParam.getCharset());
//            m.messageId = MQC.MQMI_NONE;
            m.encoding = rawMqParam.getCcsid(); //设置编码与mq服务一致
            m.characterSet = rawMqParam.getCcsid(); //设置字符集与mq服务一致
            m.correlationId = correlationId;
            m.write(msg.getBytes(rawMqParam.getCharset()));  //写入消息体
            //发送消息
            queue.put(m, pmo);
            System.out.println("send, hex sendMsgId:"+ HexUtils.toHexString(m.messageId));
            return m.messageId;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            close(queue, qm);
        }
        return null;
    }

    /**
     * 接收消息
     * @param correlationId 根据correlationId取消息
     * @param msgId 根据msgId取消息
     * @return
     */
    public String receiveMsg(byte[] correlationId, byte[] msgId) {
        //定义队列管理器
        MQQueueManager qm = null;
        //定义队列
        MQQueue queue = null;
        try {
            //初始队列管理器
            qm = new MQQueueManager(rawMqParam.getReceiveQueueManager(),rawMqParam.getReceiveCfg() ,receiveMqSimpleConnectionManager);
            queue = qm.accessQueue(rawMqParam.getQueue(), MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING);
            System.out.println("receiveQueue curDepth:"+queue.getCurrentDepth());
            MQMessage m = new MQMessage();
            m.format = MQC.MQFMT_STRING;
            m.encoding = rawMqParam.getCcsid(); //设置编码与mq服务一致
            m.characterSet = rawMqParam.getCcsid(); //设置字符集与mq服务一致

            MQGetMessageOptions gmo = new MQGetMessageOptions();
            gmo.waitInterval = MQC.MQWI_UNLIMITED;
            if (null != correlationId) {
                gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID; //使用correlationId匹配响应消息
                m.correlationId = correlationId;
            }else if(null != msgId) {
                gmo.matchOptions = MQC.MQMO_MATCH_MSG_ID; //使用msgId匹配响应消息
                m.messageId = msgId;
            }
            queue.get(m, gmo);
            System.out.println("receive, hex sendMsgId:"+ HexUtils.toHexString(m.messageId));

            byte[] bu = new byte[m.getMessageLength()];
            m.readFully(bu);
            String msg = new String(bu);
            return msg;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            close(queue, qm);
        }
        return null;
    }

    //关闭队列和队列管理器,归还连接
    public void close(MQQueue q, MQQueueManager qm) {
        try {
            if (q != null && q.isOpen()) {
                q.close();
            }
            if (qm != null) {
                qm.disconnect();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.5 定义其他相关类

定义实体Stu.java:

package com.dragon.study.springbootwebsperemq.mq.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;

@Data
@AllArgsConstructor
public class Stu implements Serializable {
    private String  name;

    @Override
    public String toString() {
        return "Stu{" +
                "name='" + name + '\'' +
                '}';
    }
}

启动类SpringBootWebsperemqApplication.java如下:

package com.dragon.study.springbootwebsperemq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootWebsperemqApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootWebsperemqApplication.class, args);
    }
}

3.6 测试

发送、接收消息测试如下:

package com.dragon.study.springbootwebsperemq;

import com.dragon.study.springbootwebsperemq.rawMq.service.RawMqService;
import org.apache.tomcat.util.buf.HexUtils;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class RawMqTests {
    @Resource
    private RawMqService rawMqService;

    @Test
    public void rawMsgTest() {
        byte[] correlationId = "001".getBytes();
        //发送消息,返回消息id
        byte[] msgId = rawMqService.sendMsg("hello1", correlationId);
        //查看消息id,byte数组是以16进制编码的
        String msgIdStr = HexUtils.toHexString(msgId);
        System.out.println(msgIdStr);
        //根据消息id获取消息
//        String msg = rawMqService.receiveMsg(null, msgId);
        //根据correlationId获取消息
        String msg = rawMqService.receiveMsg(correlationId, null);
        System.out.println("receive msg:" + msg);
    }
}

 类似资料: