高层体系结构
JMS(生产者/消费者)<---->Artemis(STOMP)<---->Websocket-Broker-Relay-Service<---->STOMP-over-Websocket-client(生产者/消费者)
class StompSessionHandlerImpl implements StompSessionHandler {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.setAutoReceipt(Boolean.FALSE);
StompHeaders headers1 = new StompHeaders();
headers1.setDestination("/queue/msg");
headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
headers1.add("Authorization", "Bearer ".concat(token));
headers1.setAck("client-individual");
session.subscribe(headers1, this);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
synchronized (StompSessionHandlerImpl.msgSenderLock) {
if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
initStompSession();
}
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return COMessage.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (payload == null) return;
COMessage msg = (COMessage) payload;
try {
stompMessagingService.handleReceivedMessages(msg);
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
} catch (Exception e) {
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
}
}
@PreDestroy
public void cleanUp() {
self.stompMessagingService.getStompSession().disconnect();
}
}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Bean
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
class ArtemisConfig extends ArtemisAutoConfiguration {
@Bean("mqConnectionFactory")
public ConnectionFactory senderActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(-1L);
connectionFactory.setClientID(clientID);
connectionFactory.setEnableSharedClientID(true);
connectionFactory.setPreAcknowledge(Boolean.FALSE);
return connectionFactory;
}
@Bean("mqCachingConnectionFactory")
@Primary
public ConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jsonMessageConverter);
jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
return jmsTemplate;
}
@PreDestroy
public void cleanUp() {
if (connection.isStarted()) {
try {
connection.close();
} catch (JMSException e) {
log.error("Failed to close the JMS connection {0}", e);
}
}
}
}
当使用ActiveMQ Artemis时,STOMPACK
帧告诉代理消息已被成功使用,因此应该将其从队列中删除。STOMPnack
帧告诉代理消息没有被成功使用,因此代理将丢弃它。STOMP规范没有指定此处的确切行为。它只说:
NACK
与ACK
相反。它用于告诉服务器客户端没有使用消息。然后,服务器可以将消息发送到另一个客户机,丢弃它,或者将其放在死信队列中。确切的行为是特定于服务器的。
NACK
采用与ACK
相同的头:ID
(REQUIRED)和transaction
(OPTIONAL)。
在将来,我希望这种行为是可配置的。
我有两个消费群体,即G1和G2。 null 类似地,当G2轮询后,它仍然会找到关于主题的消息。这里还枯萎M3或M4会收到消息吗? 我也相信所有的成员都应该在同一个节点上。对吧?客户端代码或Kafka的责任是选择一个组中的特定成员吗?
我正在使用WerbLogic 10.3.5和Spring 3.0实现JMS队列。我有以下Spring配置: 我的消息创建代码如下所示: 我的听众是这样的: 消息被正确创建,侦听器的onMessage()方法被调用,但是如果逻辑失败,我抛出RuntimeException(),消息不会被重新传递。我尝试了上述代码的许多细微变化(例如设置SessionAcknowledgeMemodeName=SES
我读到:http://www.javaworld.com/article/2074123/java-web-development/transaction-and-redelivery-in-jms.html?page=2 "通常,确认特定消息会确认会话接收的所有先前消息"(在客户端确认模式下) “邮件重新传递不是自动的,但在某些情况下会重新传递邮件” 我的问题是: 如何确保每次收到消息时都有一个
FCM服务未向我的iOS应用程序发送消息。 > App CAN成功接收APNs令牌和实例ID令牌 App CAN使用推送通知实用程序利用. p8令牌在后台成功接收来自APN的推送 #2中使用的相同APNs密钥上传到Firebase控制台 应用程序无法接收Firebase控制台中Notification Composer发送的消息,也无法使用CURL请求接收消息。 应用程序在通过FCM发送时不显示任
好的,事情是这样的,我已经在Stackoverflow中问了几个与firebase相关的问题,即使没有任何答案,我还是设法让firebase工作,并收到了一系列通知。我真的不知道我做了什么使它工作,这是一个结合了很多东西。 (之前发布的相关问题:如何使用webpack将Firebase云消息传送到ReactJS项目中|ReactJS和Webpack不工作的Firebase FCM:我们无法注册默认
我想了解消费者信息处理尝试间隔时间的限制。例如,假设我有以下AWS资源 SQS队列(名为SQSQueueName1)w/reDrive配置为发送死信消息到SQSQueueName1DLQ SQS队列DLQ(名为SQSQueueName1DLQ) Lambda函数(名为LambdaName1) 如果SQSQueueName1有一个重置策略,MaxRecieveCount设置为10,在这种情况下,消费