import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
public class Main {
static String AMQ = "vm://0";
public static void main(String[] args) throws Exception {
EmbeddedActiveMQ server = null;
try {
server = createEmbeddedBroker();
var serverLocator = ActiveMQClient.createServerLocator(AMQ);
var clientSessionFactory = serverLocator.createSessionFactory();
createQueues(clientSessionFactory);
// queues are empty on creation
try (var session = clientSessionFactory.createSession()) {
assertQueueLength(session, "test.#", 0);
assertQueueLength(session, "test.A", 0);
}
sendMessage(clientSessionFactory, "test.A");
// expect message is delivered to both
try (var session = clientSessionFactory.createSession()) {
assertQueueLength(session, "test.#", 1);
assertQueueLength(session, "test.A", 1);
}
consumeMessage(clientSessionFactory, "test.#");
// expect message is consumed from both
try (var session = clientSessionFactory.createSession()) {
assertQueueLength(session, "test.#", 0); // ok - message gone
assertQueueLength(session, "test.A", 0); // fails!
}
} finally {
if (server != null) server.stop();
}
}
private static EmbeddedActiveMQ createEmbeddedBroker() throws Exception {
var config = new ConfigurationImpl();
config.addAcceptorConfiguration("vm", AMQ);
config.setSecurityEnabled(false);
config.setPersistenceEnabled(false);
var server = new EmbeddedActiveMQ();
server.setConfiguration(config);
server.start();
return server;
}
private static void createQueues(ClientSessionFactory csf) throws Exception {
var session = csf.createSession();
/*
<address name="test.A">
<anycast>
<queue name="test.A" />
</anycast>
</address>
*/
var testA = new QueueConfiguration("test.A")
.setRoutingType(RoutingType.ANYCAST)
.setAddress("test.A");
session.createQueue(testA);
/*
<address name="test.#">
<anycast>
<queue name="test.#" />
</anycast>
</address>
*/
var testWildcard = new QueueConfiguration("test.#")
.setRoutingType(RoutingType.ANYCAST)
.setAddress("test.#");
session.createQueue(testWildcard);
// also tried to create address without a queue, but the message to test.A is not delivered to test.#
// session.createAddress(new SimpleString("test.#"), RoutingType.ANYCAST, false);
}
private static void sendMessage(ClientSessionFactory csf, String queue) throws Exception {
var session = csf.createSession();
var producer = session.createProducer(queue);
producer.send(session.createMessage(true));
producer.close();
session.close();
}
private static void consumeMessage(ClientSessionFactory csf, String queue) throws Exception {
var session = csf.createSession();
var consumer = session.createConsumer(queue);
consumer.setMessageHandler(message -> {
try {
log("Consuming one message from " + queue);
message.acknowledge();
log("Consumed one message from " + queue);
} catch (ActiveMQException e) {
throw new IllegalStateException(e);
}
});
session.start();
Thread.sleep(1000); // hack to wait
consumer.close();
session.close();
}
private static void assertQueueLength(ClientSession session, String queue, long expected) throws Exception {
long actual = session.queueQuery(SimpleString.toSimpleString(queue)).getMessageCount();
if (actual != expected) {
throw new IllegalStateException("Queue " + queue + " has " + actual + " messages. Expected " + expected);
} else {
log("Queue " + queue + " has " + actual + " messages as expected");
}
}
private static void log(String msg) {
System.out.println(">>> " + msg);
}
}
依赖关系:
org.apache.activemq:artemis-core-client:2.17.0
org.apache.activemq:artemis-server:2.17.0
你所看到的是预期的行为。这里要记住的关键是,您使用的是通配符路由,而不是通配符消耗。使用通配符路由,消息不仅被路由到显式发送消息的地址的队列,而且还被路由到匹配通配符地址的任何队列。消息路由到的每个队列都有自己的消息副本。
通配符路由是在考虑多播(即pub/sub)用例(如分层主题)的情况下实现的,但如果您想将其与选播一起使用,有以下几个选项:
test.A
without a queue, e.g.: session.createAddress(SimpleString.toSimpleString("test.A"), RoutingType.ANYCAST, false);
var testA = new QueueConfiguration("test.A")
.setRoutingType(RoutingType.ANYCAST)
.setPurgeOnNoConsumers(true)
.setAddress("test.A");
使用使用者的通配符从多个队列接收消息是一个非常方便的特性,但在ActiveMQ Artemis中没有实现。然而,创建多个消费者应该不是很难。
ActiveMQ:5.10.2在ServiceMix的Karaf OSGi中 卡哈布坚持。 默认代理设置。连接中的默认设置(TCP://x.x.x.x.x:61616) 一切正常,但是:如果我将消费者的数量减少到1(或者2或3个,我不知道阈值在哪里),那么来自1个队列的消息将被消耗,来自另一个队列的消息将被存储。过了一段时间,我看到了这张照片: 1用户停止接收消息。他认为没有更多消息了。 从act
本文向大家介绍PHP多进程通信-消息队列使用,包括了PHP多进程通信-消息队列使用的使用技巧和注意事项,需要的朋友参考一下 向消息队列发送数据和获取数据的测试 以上所述是小编给大家介绍的PHP通信-消息队列使用详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对呐喊教程网站的支持!
我是JMS新手,经过长时间的搜索,我搜索出了一个连接到JMS的代码,并发布了一条消息。 问题是我需要在远程队列中发布消息,但我不知道如何建立连接到它并发布消息。 服务器类型:TIBCO EMS 服务器主机:******。net 端口:**USername:user passsbrow:user123 队列:**。。。。顺序经营1. 我想建立连接,发布一条简单的消息,然后把它取回。请帮忙!提前谢谢
我正在阅读OracleDocGenericmethod中的泛型方法。当它说什么时候使用通配符和什么时候使用泛型方法时,我对比较感到很困惑。引用文档。 我们本可以在这里使用通用方法: [...]这告诉我们类型参数正被用于多态;它的唯一作用是允许在不同的调用站点使用各种实际的参数类型。如果是这样的话,应该使用通配符。通配符旨在支持灵活的子类型,这就是我们在这里试图表达的。 我们不觉得像《代码》(Col
问题内容: 使用Type通配符的Update()也存在该问题,但是我发现DocumentExists()的作用相同,因此在此将问题简化如下: 这有效… 但这失败了 如果我完全省略Type,它也会失败。有人知道如何进行这项工作吗?(即使不管文档的类型如何都可以,对我而言还是可以的。) 问题答案: 据我所知,不可能在类型名称中指定通配符,但是您可以做一些技巧。 您可以在索引中查询具有特定ID的文档,并
问题内容: 这是来自 HeadFirst Java的 :(第575页) 这个: 做与此相同的事情: 所以这是我的问题:如果它们完全相同,我们为什么不写 要么 另外,什么时候使用?是有用的?而不是使用泛型的方法声明(如上所述)中的T或用于类声明?有什么好处? 问题答案: 之间的最大区别 和 是在前一种方法中,您可以在方法中将“ T”作为给出的具体类。在第二种方法中,您无法执行此操作。 这里有一个更复