我有一个Spring集成流,它产生的消息应该保留在周围,等待合适的消费者出现并使用它们。
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}
队列中的消息应该通过超文本传输协议作为服务器发送的事件传递,如下所示。
PublisherSubscription只是Publisher和IntegrationFlow注册的持有者,后者用于在不再需要时销毁动态创建的流(请注意,GET的传入消息没有内容,Web通量集成没有正确处理ATM,因此需要一个小的解决方法来访问推入客户
标头的路径变量):
@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}
上述服务器发送事件的请求动态注册了一个流,该流根据需要向选择性消费者订阅队列通道,该消费者通过过滤器通过ThroweExceptiononRejection(true)实现。遵循消息处理程序链规范,该规范应确保向所有消费者提供消息,直到消费者接受为止。
public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}
这种结构原则上可行,但存在以下问题:
我想要的是消息保留在队列中,并重复提供给所有订阅者,直到消息被消费或过期(适当的选择性消费者)。我该怎么做?
请注意,GET的传入消息没有内容,这没有被Web通量集成正确处理
我不理解这种担忧。
WebFluxInboundEndpoint与此算法配合使用:
if (isReadable(request)) {
...
else {
return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}
其中,GET
方法实际上转到了else分支。要发送的消息的有效载荷是一个多值映射。此外,我们最近还与您一起解决了发布在5.0.5版本中的帖子的问题:https://jira.spring.io/browse/INT-4462
Dispatcher没有订阅者
原则上不能在队列通道上发生。那里根本没有调度员。它只是队列,发送方提供要存储的消息。你错过了其他与我们分享的内容。但是让我们用它自己的名字来调用:messagesPerCustomerQueue在应用程序中不是一个队列通道。
更新
关于:
我想要的是消息保留在队列中,并重复提供给所有订阅者,直到消息被消费或过期(适当的选择性消费者)
我们看到的只是一个基于嵌入式ActiveMQ的可轮询JMSChannel,以支持消息的TTL。作为该队列的消费者,您应该拥有一个带有setMinSubscribers(1)的publishsubscribebchannel,以制作消息模板(MessagingTemplate),在没有订阅者时抛出消息传递异常(MessageDeliveryException)。这样,JMS事务将回滚,消息将返回到队列以进行下一个轮询周期。
内存中QueueChannel
的问题是没有事务重新传递,并且一旦从该队列中轮询消息就会丢失。
另一个类似于JMS(事务性)的选项是QueueChannel
的JdbcChannelMessageStore
。虽然这样我们没有TTL功能...
使用RabbitMQ,有没有一种方法可以将消息从队列“推送”给使用者,而不是让使用者从队列“轮询并拉出”消息? 这也是我目前正在进行的一个项目引起一些争论的原因。一个方面的论点是,让使用者(即windows服务)“轮询”队列直到新消息到达,与将消息从队列自动“推送”到订户/使用者的想法相比,这种想法有些低效,也不太理想。 我似乎只能找到支持消费者从队列中“轮询并拉出”消息的信息(例如,使用wind
我有一个向rabbitmq发送消息的服务,消费者对消息进行一些操作并重新排队。 我可以成功地将初始消息发送给rabbitmq,但问题是,如果消息需要修改,我无法将任何已使用的消息重新发送给rabbitmq。 我试图用new创建一个新类,但“MyService”始终为空
在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法
有一个SQS队列,我在其中不断收到消息。我只需要阅读和处理过去24小时内收到的信息。目前收到的信息应在第二天处理<代码>时间戳存储在消息正文中。 是否可以从SQS队列中选择性地读取消息。例如,只读那些值大于前一天的时间戳但小于当前时间戳(当前时间戳是此作业运行的时间)的消息?
消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?