我正在使用ActiveMQ Artemis 2.17.0,并且面临路由问题。
我实现了一个插件,它记录了before消息路由,我看到一些消息从topic.private.abc.task.v1
路由到topic.abc.rawmessage.v1
。
没有转移设置,主题和队列由生产者和消费者动态创建。有一个将目标集群化.*.>
映射到虚拟主题的设置
private TransportConfiguration getServerTransportConfiguration() {
Map<String, Object> extraProps = new HashMap<>();
extraProps.put("virtualTopicConsumerWildcards", "clustered.*.>;2");
Map<String, Object> params = new HashMap<>();
params.put("scheme", "tcp");
params.put("port", port);
params.put("host", hostname);
return new TransportConfiguration("org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory", params, "netty-acceptor", extraProps);
}
topic.private.abc.task.v1
和topic.abc.rawmessage.v1
都是有效的主题,但它们不应该被链接。
什么能解释这种行为?
@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
Map<String, Object> map = new HashMap<>();
map.put("RoutingContext", new RoutingContextLogView(context));
logger.info(mapper.writeValueAsString(map));
ActiveMQServerPlugin.super.beforeMessageRoute(message, context, direct, rejectDuplicates);
}
public class RoutingContextLogView {
private RoutingContext routingContext;
public RoutingContextLogView(RoutingContext routingContext) {
this.routingContext = routingContext;
}
public String getAddress() {
return routingContext.getAddress() != null ? routingContext.getAddress().toString() : null;
}
public String getPreviousAddress() {
return routingContext.getPreviousAddress() != null ? routingContext.getPreviousAddress().toString() : null;
}
public String getRoutingType() {
return routingContext.getRoutingType() != null ? routingContext.getRoutingType().name() : null;
}
public String getPreviousRoutingType() {
return routingContext.getPreviousRoutingType() != null ? routingContext.getPreviousRoutingType().name() : null;
}
}
代理在内部使用的routingcontext
html" target="_blank">对象是可重用的。这样做是出于性能原因,以防止无论如何都必须为每个路由操作重新创建routingcontext
。正如人们可能猜到的那样,路由消息是代理中非常常见的操作,因此尽可能优化它是值得的。重用routingcontext
意味着创建和丢弃的对象更少,这意味着需要清理的垃圾更少,这意味着代理的停顿更少,总体性能更好。
这里的previousaddress
与当前消息的路由地址不同,这不是问题。它只是意味着上下文不会被重新用于此路由操作,因此将被清除。顾名思义,BeforeMessageRoute
方法是在执行任何路由逻辑之前调用的(例如,清除RoutingContext
)。如果使用AfterMessageRoute
检查RoutingContext
,那么您应该看到它被清除并填充了正确的详细信息。
消息“发送”和消息“路由”(两者都有插件挂钩)是相关但不同的操作。响应客户端操作“发送”消息。发送总是导致路由。但是,并不是所有的路由都是发送的结果。由于不涉及发送的内部代理操作(例如,在集群中移动消息、使消息过期、将无法传递的消息取消到死信地址、使用转移等),消息可以被路由。
我要提醒您不要检查内部代理状态(可能很微妙),当其他一切都表明代理正常运行时,不要假设存在问题。在本例中,您说您“面临路由问题”,并且“一些消息从topic.private.abc.task.v1
路由到topic.abc.rawmessage.v1
”,而实际上没有路由问题,消息实际上并没有从topic.private.abc.rawmessage.v1
路由到topic.abc.rawmessage.v1
。据我所见,实际上一切都正常运转。
null 订阅目标,不带选择器/筛选器 发送到目标,标题为clientId=ID(服务器将请求的客户端的ID) 客户123: 订阅目标,选择器clientId=123 发送到目标,标头clientId=123 Client123断开连接:删除多播地址和filter clientId=123的协同响应多播队列 服务器向客户端123的TaskRequest发送消息:根据服务器上的客户端,消息发送成功。
我有这样的场景: 具有多个选播队列的两个地址。我需要将消息路由到指定的队列中的地址。
第一,你能确认行为的改变吗?第二,有没有恢复的方法?还有一个额外的好处,如果有人知道为什么会发生这种变化,我很想理解。
有人能指导我如何在ActiveMQ Artemis Broker上拦截MQTT消息吗?我按照手册中的建议进行了尝试,但是MQTT消息没有被拦截。然而,消息的发布和订阅工作正常。 broker类的完整代码位于https://codeshare.io/snzsb
你知道这里发生了什么吗?可以恢复消息和队列吗?如何使用ActiveMQ Artemis数据?所有队列/消息都丢失了。只有dlq/expiryqueue。 我们必须更改配置。所以我们开始更新从机。 为了将故障转移到从服务器以修复主服务器,我们执行了@16:32 在Master上有一些警告,但不多。 在从服务器上,我们可以看到一些超时和连接失败。不确定Slave是否接管了队列并且工作正常。 @16:4
我正在试用ActiveMQ Artemis进行消息传递设计。我期待消息与嵌入的文件内容(字节)。我不希望它们大于10MB。但是,我想知道在Artemis中是否有一种可配置的方法来处理这个问题。它是否支持默认的最大消息大小?我试着寻找答案,但没有找到任何答案。另外,我的生产者和消费者都是。NET AMQP实现。