当前位置: 首页 > 知识库问答 >
问题:

Spring集成:如何使用QueueChannel on-demand作为背压感知反应(流量)管道

赵智
2023-03-14

我有一个Spring集成队列通道,我想在背压感知流量管道中使用它。

  1. 预取n来自队列的消息。
  2. 使异步调用到外部系统像乐趣远程呼叫(消息:消息):单声道

我不想使用带有调度程序的轮询器提前从队列中提取消息。

在最新的Spring集成Java/Kotlin DSL中,最好的方法是什么,包括错误恢复等?


共有1个答案

万德海
2023-03-14

请参见IntegrationReactiveUtils。messageChannelToFlux()

/**
 * Adapt a provided {@link MessageChannel} into a {@link Flux} source:
 * - a {@link org.springframework.integration.channel.FluxMessageChannel}
 * is returned as is because it is already a {@link Publisher};
 * - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
 * for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
 * - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
 * {@link #messageSourceToFlux(MessageSource)}.
 * @param messageChannel the {@link MessageChannel} to adapt.
 * @param <T> the expected payload type.
 * @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
 */
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {

然后您可以使用IntegrationFlows.from(Publisher)

/**
 * Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
 * and subscribe it to the provided {@link Publisher}.
 * @param publisher the {@link Publisher} to subscribe to.
 * @return new {@link IntegrationFlowBuilder}.
 */
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
 类似资料:
  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?

  • 我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:

  • 如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一

  • 我正在尝试将SpringReactor与我的SpringBoot应用程序一起使用。 我正在使用ProjectReactor 3.0.7。松开并安装Spring护套1.5.3。释放 在我的服务类中有一个返回通量的方法。 我想将该值返回到web层中的控制器。但是,我没有看到json响应中返回的值。 当我从浏览器调用http://localhost:8080时,我得到的响应是,{“预取”:-1} 我不确

  • 我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?

  • Spring integration提供了非反应的入站/出站WebSocket适配器,简单地说,它通过内部容器将会话与id相关联,您对消息进行一些处理,在出站时,它检查消息头是否有会话id,并通过该会话发送。 现在,Spring通过org.springframework.web.reactive.socket.WebSocketSession和其他类提供了反应性WebSocket支持,我想知道在反