当前位置: 首页 > 知识库问答 >
问题:

如果处理失败,如何将STOMP消息重新传递给消费者?

鞠乐
2023-03-14

高层体系结构

JMS(生产者/消费者)<---->Artemis(STOMP)<---->Websocket-Broker-Relay-Service<---->STOMP-over-Websocket-client(生产者/消费者)

class StompSessionHandlerImpl implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        session.setAutoReceipt(Boolean.FALSE);
        StompHeaders headers1 = new StompHeaders();
        headers1.setDestination("/queue/msg");
        headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
        headers1.add("Authorization", "Bearer ".concat(token));
        headers1.setAck("client-individual");
        session.subscribe(headers1, this);

    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        synchronized (StompSessionHandlerImpl.msgSenderLock) {
            if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
                initStompSession();
            }
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return COMessage.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        if (payload == null) return;
        COMessage msg = (COMessage) payload;
     try {
        stompMessagingService.handleReceivedMessages(msg);
        self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
       } catch (Exception e) {
           self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
       }

    }


    @PreDestroy
    public void cleanUp() {
        self.stompMessagingService.getStompSession().disconnect();
    }

}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
    @Bean
    public WebSocketStompClient stompClient() {
        WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
        List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        converter.setObjectMapper(objectMapper);
        stompClient.setMessageConverter(converter);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10000);
        scheduler.initialize();
        stompClient.setTaskScheduler(scheduler);
        stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
        stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
        ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
        return stompClient;
    }
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private String host;

    private String password;

    private String user;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
                .setRelayHost(host)
                .setClientLogin(user)
                .setClientPasscode(password)
                .setSystemHeartbeatSendInterval(20000)
                .setSystemLogin(user)
                .setSystemPasscode(password)
                .setUserDestinationBroadcast("/topic/unresolved-user")
                .setUserRegistryBroadcast("/topic/log-user-registry");
        config.setApplicationDestinationPrefixes("/device");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
        registry.setErrorHandler(new StompSubProtocolErrorHandler());
    }

    @Bean
    public DefaultSimpUserRegistry getDefaultSimpRegistry() {
        return new DefaultSimpUserRegistry();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(Integer.MAX_VALUE);
        registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
        registry.setTimeToFirstMessage(300000);
        registry.setSendTimeLimit(300000);
        registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });

    }

}
class ArtemisConfig extends ArtemisAutoConfiguration {

    @Bean("mqConnectionFactory")
    public ConnectionFactory senderActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory =
               new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionTTL(-1L);
        connectionFactory.setClientID(clientID);
        connectionFactory.setEnableSharedClientID(true);
        connectionFactory.setPreAcknowledge(Boolean.FALSE);
        return connectionFactory;
    }

    @Bean("mqCachingConnectionFactory")
    @Primary
    public ConnectionFactory cachingConnectionFactory() {
        return new CachingConnectionFactory(senderActiveMQConnectionFactory());
    }

    @Bean("jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setMessageConverter(jsonMessageConverter);
        jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
        jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
        return jmsTemplate;
    }

    @PreDestroy
    public void cleanUp() {
        if (connection.isStarted()) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.error("Failed to close the JMS connection {0}", e);
            }
        }
    }

}

共有1个答案

韩嘉祯
2023-03-14

当使用ActiveMQ Artemis时,STOMPACK帧告诉代理消息已被成功使用,因此应该将其从队列中删除。STOMPnack帧告诉代理消息没有被成功使用,因此代理将丢弃它。STOMP规范没有指定此处的确切行为。它只说:

NACKACK相反。它用于告诉服务器客户端没有使用消息。然后,服务器可以将消息发送到另一个客户机,丢弃它,或者将其放在死信队列中。确切的行为是特定于服务器的。

NACK采用与ACK相同的头:ID(REQUIRED)和transaction(OPTIONAL)。

在将来,我希望这种行为是可配置的。

 类似资料:
  • 我有两个消费群体,即G1和G2。 null 类似地,当G2轮询后,它仍然会找到关于主题的消息。这里还枯萎M3或M4会收到消息吗? 我也相信所有的成员都应该在同一个节点上。对吧?客户端代码或Kafka的责任是选择一个组中的特定成员吗?

  • 我正在使用WerbLogic 10.3.5和Spring 3.0实现JMS队列。我有以下Spring配置: 我的消息创建代码如下所示: 我的听众是这样的: 消息被正确创建,侦听器的onMessage()方法被调用,但是如果逻辑失败,我抛出RuntimeException(),消息不会被重新传递。我尝试了上述代码的许多细微变化(例如设置SessionAcknowledgeMemodeName=SES

  • 我读到:http://www.javaworld.com/article/2074123/java-web-development/transaction-and-redelivery-in-jms.html?page=2 "通常,确认特定消息会确认会话接收的所有先前消息"(在客户端确认模式下) “邮件重新传递不是自动的,但在某些情况下会重新传递邮件” 我的问题是: 如何确保每次收到消息时都有一个

  • FCM服务未向我的iOS应用程序发送消息。 > App CAN成功接收APNs令牌和实例ID令牌 App CAN使用推送通知实用程序利用. p8令牌在后台成功接收来自APN的推送 #2中使用的相同APNs密钥上传到Firebase控制台 应用程序无法接收Firebase控制台中Notification Composer发送的消息,也无法使用CURL请求接收消息。 应用程序在通过FCM发送时不显示任

  • 好的,事情是这样的,我已经在Stackoverflow中问了几个与firebase相关的问题,即使没有任何答案,我还是设法让firebase工作,并收到了一系列通知。我真的不知道我做了什么使它工作,这是一个结合了很多东西。 (之前发布的相关问题:如何使用webpack将Firebase云消息传送到ReactJS项目中|ReactJS和Webpack不工作的Firebase FCM:我们无法注册默认

  • 我想了解消费者信息处理尝试间隔时间的限制。例如,假设我有以下AWS资源 SQS队列(名为SQSQueueName1)w/reDrive配置为发送死信消息到SQSQueueName1DLQ SQS队列DLQ(名为SQSQueueName1DLQ) Lambda函数(名为LambdaName1) 如果SQSQueueName1有一个重置策略,MaxRecieveCount设置为10,在这种情况下,消费