我在开发订阅MQ主题(MQ版本9)应用程序时遇到了问题。
我需要做一个共享主题连接,因为应用程序将在多个实例(集群)中运行。
规范和文档中说:“非持久共享订阅由一个客户端使用,该客户端需要能够在多个使用者之间共享从主题订阅接收消息的工作。因此,非持久共享订阅可能有多个使用者。来自订阅的每条消息将只传递给该订阅上的一个使用者。”
对我来说,所有使用相同订阅名称的客户端都在同一个“集群”中,一次只有一个客户端会收到一条消息。
在我的代码中,受本文的启发,当第二个客户机尝试创建共享订阅时,我遇到了一个异常。我真的不明白这是MQ客户机库实现中的一个bug还是我的代码中的一个bug。
这里是我的示例代码:
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import com.ibm.mq.jms.MQTopicConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
public class TestGB2 {
public static void main(final String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(new MyThread("THREAD" + i, "TESTSUB/#", "myClient", "SUBTEST")).start();
}
}
public static class MyThread implements Runnable {
private final String topicString;
private final String clientId;
private final String subscriptionName;
public MyThread(final String threadName, final String topicString, final String clientId, final String subscriptionName) {
Thread.currentThread().setName(threadName);
this.topicString = topicString;
this.clientId = clientId;
this.subscriptionName = subscriptionName;
}
@Override
public void run() {
try {
System.out.println(String.format("%s : Connecting...", Thread.currentThread().getName()));
MQTopicConnectionFactory cf = new MQTopicConnectionFactory();
cf.setHostName("xxxx");
cf.setPort(1416);
cf.setQueueManager("xxxx");
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
cf.setChannel("xxx");
cf.setClientID(clientId);
Connection con = cf.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
con.start();
Topic topic = session.createTopic(topicString);
MessageConsumer messageConsumer = session.createSharedConsumer(topic, subscriptionName); // fail here
System.out.println(String.format("%s : Waiting for a message...", Thread.currentThread().getName()));
Message msg = messageConsumer.receive();
System.out.println(String.format("%s : Received :\n%s", Thread.currentThread().getName(), msg));
}
catch (Exception ex) {
System.out.println(String.format("%s : FAILED", Thread.currentThread().getName()));
ex.printStackTrace();
}
}
}
}
com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0026: Failed to subscribe to topic 'TESTSUB' with selector 'none' using MQSUB.
There may have been a problem creating the subscription due to it being used by another message consumer.
Make sure any message consumers using this subscription are closed before trying to create a new subscription under the same name. Please see the linked exception for more information.
at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:472)
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:214)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:212)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.checkJmqiCallSuccess(WMQMessageConsumer.java:112)
at com.ibm.msg.client.wmq.internal.WMQConsumerShadow.initialize(WMQConsumerShadow.java:1038)
at com.ibm.msg.client.wmq.internal.WMQSyncConsumerShadow.initialize(WMQSyncConsumerShadow.java:134)
at com.ibm.msg.client.wmq.internal.WMQMessageConsumer.<init>(WMQMessageConsumer.java:470)
at com.ibm.msg.client.wmq.internal.WMQSession.createSharedConsumer(WMQSession.java:938)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4228)
at com.ibm.msg.client.jms.internal.JmsSessionImpl.createSharedConsumer(JmsSessionImpl.java:4125)
at com.ibm.mq.jms.MQSession.createSharedConsumer(MQSession.java:1319)
at TestGB.lambda$0(TestGB.java:33)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2042' ('MQRC_OBJECT_IN_USE').
at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:202)
... 11 more
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.1.0</version>
</dependency>
问题不是与您的程序有关,而是与您订阅的主题相关联的模型队列有关。
在队列管理器上,如果您查看订阅将匹配的主题对象,它将有一个参数mndurmdl
指向模型队列。
如果您查看模型队列,您将注意到两个参数,其中任何一个或两个参数都可能导致您接收到的错误:
[ DEFSOPT( EXCL | SHARED ) ]
[ SHARE | NOSHARE ]
必须将它们设置为defsopt(SHARED
和share
。如果将其中一个设置为另一个值,则共享订阅上只能有一个订阅服务器。
对于IBM MQ pub/sub,当您创建一个JMS订阅时,MQ将其视为托管订阅,IBM MQ将在后台创建一个临时队列来订阅主题字符串。如果是非持久订阅,则队列是临时动态队列。
失败的原因是第一个线程将以独占模式打开临时动态队列,然后任何其他线程都无法打开临时动态队列,并且您收到mqrc_object_in_use
错误。
我怀疑这是因为IBM提供了一些不同的默认模型队列。
非持久订阅服务器的默认值具有以下设置:
QUEUE(SYSTEM.NDURABLE.MODEL.QUEUE) TYPE(QMODEL)
DEFSOPT(SHARED) SHARE
QUEUE(SYSTEM.DEFAULT.MODEL.QUEUE) TYPE(QMODEL)
DEFSOPT(EXCL) NOSHARE
DEFINE QMODEL(xxx)
以后,您可以专门设置这两个参数,或者使用like
关键字定义它,以强制它使用不同的队列来建模设置,这两个命令如下:
DEFINE QMODEL(xxx) DEFSOPT(SHARED) SHARE
DEFINE QMODEL(xxx) LIKE(SYSTEM.NDURABLE.MODEL.QUEUE)
默认情况下,树的根节点由名为system.base.TOPIC
的标准主题对象表示,与此主题关联的默认模型队列如下所示:
TOPIC(SYSTEM.BASE.TOPIC) TYPE(LOCAL)
TOPICSTR() MDURMDL(SYSTEM.DURABLE.MODEL.QUEUE)
MNDURMDL(SYSTEM.NDURABLE.MODEL.QUEUE)
如果不定义任何进一步的管理主题对象,则所有主题都与system.base.TOPIC
匹配。此外,如果您没有定义任何进一步的管理主题对象,并且希望将应用程序权限授予主题树的特定子集(例如,以testsub
开头的主题字符串),则必须通过system.base.TOPIC
授予权限,这反过来又无限制地授予应用程序对任何任意主题字符串的访问权限。
最佳实践是创建一个带有主题字符串的主题对象,该主题字符串与应用程序应该访问的主题树的部分相匹配。对于TESTSUB/#
示例,如果管理员定义了一个新的主题对象并指定了topicstr(TESTSUB)
,则默认值将如下所示创建它:
TOPIC(TESTSUB.TOPIC) TYPE(LOCAL)
TOPICSTR(TESTSUB) MDURMDL( )
MNDURMDL( )
空白的mdurmdl
和mndurmdl
值告诉MQ使用树中下一个最接近的较高主题对象的值,如果未定义任何其他内容,这将是system.base.topic
,并且模型队列仍然默认使用system.durable.model.queue
和system.ndurable.model.queue
模型队列。
管理员可以创建TOPIC对象并指定不同的模型队列,例如:
TOPIC(TESTSUB.TOPIC) TYPE(LOCAL)
TOPICSTR(TESTSUB) MDURMDL(TESTSUB.DURABLE.MODEL.QUEUE)
MNDURMDL(TESTSUB.NDURABLE.MODEL.QUEUE)
通过这样做,他们可以定义特定于应用程序的模型队列,这些队列具有共享订阅所需的设置,并且不会影响系统模型队列。另一个好处是,它们可以仅为以testsub
开头的主题字符串提供应用程序权限,例如testsub/a
或testsub/b
或testsub/x/y/z
。
我有一个单节点ActiveMQ实例,其中两个相互竞争的使用者连接到一个主题。主题订阅是根据JMS 2.0规范共享的。共享订阅确实保证只有其中一个订阅服务器(使用相同的订阅名称)获取消息。但我注意到的是,它不能保证只有在第一条消息得到确认的情况下才能传递第二条消息。如果第一个消费者需要时间来确认消息,则第二条消息甚至在消费者向代理发送第一条消息的确认之前就被传递给免费消费者。这是标准行为吗?有没有办
本文向大家介绍system.reactive 共享一个订阅(发布),包括了system.reactive 共享一个订阅(发布)的使用技巧和注意事项,需要的朋友参考一下 示例 给定一个IObservable<Offer>的offers从商家购买或以固定价格出售某些类型的项目,我们可以按照如下匹配对买家和卖家的: 问题在于,每个订阅trades将订阅offers两次。我们可以sellers和buyer
18:14:15,050警告 [org.springframework.JMS.listener.DefaultMessageListenerContainer](DefaultMessageListenerContainer-145)为目标“主题名称”设置JMS消息侦听器调用程序失败-正在尝试恢复。原因:[brm.10.2209]JMS:持久订阅“ConnectionFactory##Subsc
https://github.com/azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.azure.servicebus/topicsubscriptionwithruleoperationssample 现在我想添加一个筛选器/规则,这样只有通过筛选器中定义的特定条件的消息才应该给订阅。 例如,下面
我正在试验消息驱动Beans,以便从外部ActiveMQ实例接收主题订阅消息。 我的测试首先从队列订阅开始,它工作得很好。 然后我想尝试主题订阅,但我无法让它工作。 这就是我所拥有的: 会议记录。xml 这是MDB: 我不知道为什么,但从日志中我可以看到,TomEE创建了一个队列,而不是一个主题: 另一个证明是,当我添加持续时间配置时,服务器不会启动: 然后服务器抱怨这不适合配置类型javax.j
目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。