我最近添加了一个ActiveMQ服务器,使用Amazon自动伸缩组/负载平衡器来处理应用程序的水平伸缩。
我使用convertandsendtouser()
方法,该方法在应用程序的单个实例上工作,以定位经过身份验证的用户的“单独队列”,因此只有他们接收消息。
然而,当我启动负载html" target="_blank">均衡器后面的应用程序时,我发现只有当事件是在他们的websocket-proxy连接(到代理)建立的服务器上生成的时候,消息才会发送给用户?
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
handler.setTcpClient(new Reactor2TcpClient<>(
new StompTcpFactory(orgProperties.getAws().getAmazonMq().getStompRelayHost(),
orgProperties.getAws().getAmazonMq().getStompRelayPort(), orgProperties.getAws().getAmazonMq
().getSsl())
));
return handler;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic")
.setSystemLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
.setSystemPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass())
.setClientLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
.setClientPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass());
config.setApplicationDestinationPrefixes("/app");
}
通过检查SessionSubscribeEvent
(当用户订阅用户队列时,在侦听器中生成)中的标头,我找到了与ActiveMQ上生成的队列对应的名称SimpSessionID
。
@Override
@EventListener({SessionSubscribeEvent.class})
public void onSessionSubscribeEvent(SessionSubscribeEvent event) {
log.debug("Session Subscribe Event:" +
"{}", event.getMessage().getHeaders().toString());
}
在ActiveMQ中可以找到相应的队列,格式为:{simpDestination}-user{simpSessionId}
我可以将sessionId保存在键值对中,然后将消息推送到主题通道上吗?
我还发现了在connect/subscribe框架中设置ActiveMQ特定的STOMP属性以创建持久订阅者的一些可能性,如果我设置了这些属性,将会弹出比理解路由?
client-id
&subcriptionname
修改MessageBrokerReigstry配置
解决了这个问题:
config.enableStompBrokerRelay("/queue", "/topic")
.setUserDestinationBroadcast("/topic/registry.broadcast")
根据文件第4.4.13节中的这一段:
在多应用程序服务器场景中,由于用户连接到不同的服务器,用户目的地可能仍未解析。在这种情况下,您可以配置一个目的地来广播未解析的消息,以便其他服务器有机会尝试。这可以通过Java配置中MessageBrokerRegistry的userDestinationBroadcast属性和XML中message-broker元素的User-DestinationBroadcast属性来实现