当前位置: 首页 > 知识库问答 >
问题:

如何从com.ibm.mq.mqException恢复:MQJE001:完成代码“2”,原因“2009”

华懿轩
2023-03-14

我正在编写一个从MQ读取分段消息的使用者。我使用Spring JMS/Spring集成来处理其他队列。我知道IBM MQ不支持JMS中的消息分段:(这里的相关问题:如何在Spring integration中组装MQ消息段)

下面是我想出的将IBM MQ类用于java和Spring的方法。

MQ对象的Bean定义。

@Bean
    public MQGetMessageOptions mqGetMessageOptions() {
        MQGetMessageOptions getOptions = new MQGetMessageOptions();
        getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
        getOptions.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_ALL_SEGMENTS_AVAILABLE + CMQC.MQGMO_LOGICAL_ORDER
                + CMQC.MQGMO_COMPLETE_MSG;
        return getOptions;
    }

    @Bean
    public MQQueueManager mqQueueManager() throws Exception {
        Hashtable<String, Object> properties = new Hashtable<String, Object>();
        properties.put(CMQC.CHANNEL_PROPERTY, channel);
        properties.put(CMQC.HOST_NAME_PROPERTY, hostName);
        properties.put(CMQC.PORT_PROPERTY, new Integer(port));
        MQQueueManager qMgr = new MQQueueManager(queueManager, properties);
        return qMgr;
    }

    @Bean
    public MQQueue inboundQueue(@Autowired MQQueueManager mqQueueManager) throws Exception {
        int openOptions = CMQC.MQOO_INPUT_EXCLUSIVE;
        MQQueue inboundQueue = mqQueueManager.accessQueue(inboundQueue, openOptions);
        return inboundQueue;
    }

    @Bean
    public MessageChannel queueConsumerChannel() {
        // return new DirectChannel();
        return new ExecutorChannel(Executors.newFixedThreadPool(5));
    }

消费者代码:

@Component
@Slf4j
public class MyQueueConsumer {

    @Autowired
    MQQueueManager qMgr;

    @Autowired
    MQGetMessageOptions mqGetMessageOptions;

    @Autowired
    MQQueue inboundQueue;

    @Autowired
    MessageChannel queueConsumerChannel;

    @Autowired
    MessageSaveService messageSaveService;


    @EventListener(ApplicationReadyEvent.class)
    public void consume() {
        boolean getMore = true;
        MQMessage receiveMsg = null;
        while (getMore) {
            try {
                receiveMsg = new MQMessage();
                log.info("Waiting to consume mesages from ....");
                inboundQueue.get(receiveMsg, mqGetMessageOptions);
                byte[] b = new byte[receiveMsg.getMessageLength()];
                receiveMsg.readFully(b);
                String fileName = getFileName();
                Message<String> outMessage = MessageBuilder.withPayload(new String(b)).build();
                queueConsumerChannel.send(outMessage);
                log.info("Message consumed and sent to processng channel");
                // qMgr.commit();
            } catch (MQException e) {
                if ((e.completionCode == CMQC.MQCC_WARNING) && (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE)) {
                    log.error("Bottom of the queue reached.");

                    getMore = false;
                } else {
                    log.error("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode + " : EC=" + e.getErrorCode());
                    log.info("Is Connected :" + qMgr.isConnected());
                    log.info("Is open : " + qMgr.isOpen());
                    e.printStackTrace();
                    getMore = false;
                }
            } 
        }

    }

    @PreDestroy
    public void closeMQObjects() {
        System.out.println("Closing MQ objects ");
        try {
            if (inboundQueue != null)
                inboundQueue.close();
        } catch (MQException e) {
            System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
            e.printStackTrace();
        }
        try {
            if (qMgr != null)
                qMgr.disconnect();
        } catch (MQException e) {
            System.err.println("MQRead CC=" + e.completionCode + " : RC=" + e.reasonCode);
            e.printStackTrace();
        }
    }
}

使用这种配置,使用者可以按照需要工作,它将所有分段的消息组装起来,作为一个完整的消息读取,并在队列中等待下一个消息到达。但我面临的挑战是

共有1个答案

邹晟睿
2023-03-14

与其将inboundqueue定义为bean,不如在while循环中分配它,并在成功结束循环或遇到可重试的异常时close它。

不幸的是,我不能给你一个可重试的异常列表,但2009肯定是其中之一。

延迟重试是一个很好的做法,请参见例如ExponentialBackoff。

 类似资料: