我正在编写一个从MQ读取分段消息的使用者。我使用Spring JMS/Spring集成来处理其他队列。我知道IBM MQ不支持JMS中的消息分段:(这里的相关问题:如何在Spring integration中组装MQ消息段)
下面是我想出的将IBM MQ类用于java和Spring的方法。
MQ对象的Bean定义。
@Bean
public MQGetMessageOptions mqGetMessageOptions() {
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
getOptions.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_ALL_SEGMENTS_AVAILABLE + CMQC.MQGMO_LOGICAL_ORDER
+ CMQC.MQGMO_COMPLETE_MSG;
return getOptions;
}
@Bean
public MQQueueManager mqQueueManager() throws Exception {
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put(CMQC.CHANNEL_PROPERTY, channel);
properties.put(CMQC.HOST_NAME_PROPERTY, hostName);
properties.put(CMQC.PORT_PROPERTY, new Integer(port));
MQQueueManager qMgr = new MQQueueManager(queueManager, properties);
return qMgr;
}
@Bean
public MQQueue inboundQueue(@Autowired MQQueueManager mqQueueManager) throws Exception {
int openOptions = CMQC.MQOO_INPUT_EXCLUSIVE;
MQQueue inboundQueue = mqQueueManager.accessQueue(inboundQueue, openOptions);
return inboundQueue;
}
@Bean
public MessageChannel queueConsumerChannel() {
// return new DirectChannel();
return new ExecutorChannel(Executors.newFixedThreadPool(5));
}
消费者代码:
@Component
@Slf4j
public class MyQueueConsumer {
@Autowired
MQQueueManager qMgr;
@Autowired
MQGetMessageOptions mqGetMessageOptions;
@Autowired
MQQueue inboundQueue;
@Autowired
MessageChannel queueConsumerChannel;
@Autowired
MessageSaveService messageSaveService;
@EventListener(ApplicationReadyEvent.class)
public void consume() {
boolean getMore = true;
MQMessage receiveMsg = null;
while (getMore) {
try {
receiveMsg = new MQMessage();
log.info("Waiting to consume mesages from ....");
inboundQueue.get(receiveMsg, mqGetMessageOptions);
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
String fileName = getFileName();
Message<String> outMessage = MessageBuilder.withPayload(new String(b)).build();
queueConsumerChannel.send(outMessage);
log.info("Message consumed and sent to processng channel");
// qMgr.commit();
} catch (MQException e) {
if ((e.completionCode == CMQC.MQCC_WARNING) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE)) {
log.error("Bottom of the queue reached.");
getMore = false;
} else {
log.error("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode + " : EC=" + e.getErrorCode());
log.info("Is Connected :" + qMgr.isConnected());
log.info("Is open : " + qMgr.isOpen());
e.printStackTrace();
getMore = false;
}
}
}
}
@PreDestroy
public void closeMQObjects() {
System.out.println("Closing MQ objects ");
try {
if (inboundQueue != null)
inboundQueue.close();
} catch (MQException e) {
System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
e.printStackTrace();
}
try {
if (qMgr != null)
qMgr.disconnect();
} catch (MQException e) {
System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
e.printStackTrace();
}
}
}
使用这种配置,使用者可以按照需要工作,它将所有分段的消息组装起来,作为一个完整的消息读取,并在队列中等待下一个消息到达。但我面临的挑战是
与其将inboundqueue
定义为bean,不如在while循环中分配它,并在成功结束循环或遇到可重试的异常时close
它。
不幸的是,我不能给你一个可重试的异常列表,但2009
肯定是其中之一。
延迟重试是一个很好的做法,请参见例如ExponentialBackoff。
我在从MQ本地队列获取消息时遇到以下异常。这是我的连接代码。运行该代码后,我遇到以下异常 这是我的代码
当我尝试测试我的非常简单的消息流时,我得到了这个错误: MQJE001:完成代码“%2”,原因“2495”。 我看过与这个问题有关的其他问题,但没有一个解决方案能帮助我解决这个问题。 我也尝试过仅仅部署这个应用程序并将消息放到队列中,但是MQInput节点没有得到任何消息,即使在失败输出(转换)中也是如此,并且MQExplorer在队列中显示了一条消息。 我正在使用本地集成节点和本地队列管理器。
你能给我解释一下这个问题的原因吗?非常感谢。
使用以下SSLCipherSuite创建MQQueueManager对象时: 我想问题出在MQ配置端,因为我的jre 返回“TLS_RSA_WITH_AES_128_CBC_SHA”作为密码套件之一。但我不确定QMgr配置到底出了什么问题。谢谢你的提示。
在平台上使用WebSphere MQ 7.0.1.6版本:WebSphere MQ for AIX模式:64位O/S:AIX 6.1。 我尝试从一个运行在WebLogic11.g上的简单OS项目连接到“绑定”模式类型的MQ。我得到以下信息: 我在ibm站点上找到的步骤没有得到任何结果。 请提出解决方案。