当前位置: 首页 > 知识库问答 >
问题:

Spring集成:将排队消息分派给选择性消费者

诸葛皓
2023-03-14

我有一个Spring集成流,它产生的消息应该保留在周围,等待合适的消费者出现并使用它们。

@Bean
public IntegrationFlow messagesPerCustomerFlow() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/messages/{customer}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .requestPayloadType(JsonNode.class)
                    .headerExpression("customer", "#pathVariables.customer")
            )
            .channel(messagesPerCustomerQueue()) 
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(100);
}

@Bean
public QueueChannel messagesPerCustomerQueue() {
    return MessageChannels.queue()
            .get();
}

队列中的消息应该通过超文本传输协议作为服务器发送的事件传递,如下所示。

PublisherSubscription只是Publisher和IntegrationFlow注册的持有者,后者用于在不再需要时销毁动态创建的流(请注意,GET的传入消息没有内容,Web通量集成没有正确处理ATM,因此需要一个小的解决方法来访问推入客户标头的路径变量):

@Bean
public IntegrationFlow eventMessagesPerCustomer() {
    return IntegrationFlows
       .from(WebFlux.inboundGateway("/events/{customer}")
            .requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
            .headerExpression("customer", "#pathVariables.customer")
            .payloadExpression("''") // neeeded to make handle((p,h) work
       )
       .log()
       .handle((p, h) -> {
           String customer = h.get("customer").toString();
           PublisherSubscription<JsonNode> publisherSubscription =
               subscribeToMessagesPerCustomer(customer);
           return Flux.from(publisherSubscription.getPublisher())
                   .map(Message::getPayload)
                   .doFinally(signalType ->
                      publisherSubscription.unsubscribe());
       })
       .get();
}

上述服务器发送事件的请求动态注册了一个流,该流根据需要向选择性消费者订阅队列通道,该消费者通过过滤器通过ThroweExceptiononRejection(true)实现。遵循消息处理程序链规范,该规范应确保向所有消费者提供消息,直到消费者接受为止。

public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
    IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
            .filter("headers.customer=='" + customer + "'",
                    filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
    Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();

    IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
            .register();

    return new PublisherSubscription<>(messagePublisher, registration);
}

这种结构原则上可行,但存在以下问题:

  • 在没有订阅者的情况下发送到队列的消息会导致MessageDeliveryException:Dispatcher没有通道应用程序的订阅者。messagesPerCustomerQueue’

我想要的是消息保留在队列中,并重复提供给所有订阅者,直到消息被消费或过期(适当的选择性消费者)。我该怎么做?

共有1个答案

亢奇
2023-03-14

请注意,GET的传入消息没有内容,这没有被Web通量集成正确处理

我不理解这种担忧。

WebFluxInboundEndpoint与此算法配合使用:

if (isReadable(request)) {
   ...
else {
    return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}

其中,GET方法实际上转到了else分支。要发送的消息的有效载荷是一个多值映射。此外,我们最近还与您一起解决了发布在5.0.5版本中的帖子的问题:https://jira.spring.io/browse/INT-4462

Dispatcher没有订阅者

原则上不能在队列通道上发生。那里根本没有调度员。它只是队列,发送方提供要存储的消息。你错过了其他与我们分享的内容。但是让我们用它自己的名字来调用:messagesPerCustomerQueue在应用程序中不是一个队列通道。

更新

关于:

我想要的是消息保留在队列中,并重复提供给所有订阅者,直到消息被消费或过期(适当的选择性消费者)

我们看到的只是一个基于嵌入式ActiveMQ的可轮询JMSChannel,以支持消息的TTL。作为该队列的消费者,您应该拥有一个带有setMinSubscribers(1)的publishsubscribebchannel,以制作消息模板(MessagingTemplate),在没有订阅者时抛出消息传递异常(MessageDeliveryException)。这样,JMS事务将回滚,消息将返回到队列以进行下一个轮询周期。

内存中QueueChannel的问题是没有事务重新传递,并且一旦从该队列中轮询消息就会丢失。

另一个类似于JMS(事务性)的选项是QueueChannelJdbcChannelMessageStore。虽然这样我们没有TTL功能...

 类似资料:
  • 使用RabbitMQ,有没有一种方法可以将消息从队列“推送”给使用者,而不是让使用者从队列“轮询并拉出”消息? 这也是我目前正在进行的一个项目引起一些争论的原因。一个方面的论点是,让使用者(即windows服务)“轮询”队列直到新消息到达,与将消息从队列自动“推送”到订户/使用者的想法相比,这种想法有些低效,也不太理想。 我似乎只能找到支持消费者从队列中“轮询并拉出”消息的信息(例如,使用wind

  • 我有一个向rabbitmq发送消息的服务,消费者对消息进行一些操作并重新排队。 我可以成功地将初始消息发送给rabbitmq,但问题是,如果消息需要修改,我无法将任何已使用的消息重新发送给rabbitmq。 我试图用new创建一个新类,但“MyService”始终为空

  • 在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法

  • 有一个SQS队列,我在其中不断收到消息。我只需要阅读和处理过去24小时内收到的信息。目前收到的信息应在第二天处理<代码>时间戳存储在消息正文中。 是否可以从SQS队列中选择性地读取消息。例如,只读那些值大于前一天的时间戳但小于当前时间戳(当前时间戳是此作业运行的时间)的消息?

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?