我有一个Spring集成队列通道,我想在背压感知流量管道中使用它。
n
来自队列的消息。乐趣远程呼叫(消息:消息):单声道
我不想使用带有调度程序的轮询器提前从队列中提取消息。
在最新的Spring集成Java/Kotlin DSL中,最好的方法是什么,包括错误恢复等?
请参见IntegrationReactiveUtils。messageChannelToFlux()
:
/**
* Adapt a provided {@link MessageChannel} into a {@link Flux} source:
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
* is returned as is because it is already a {@link Publisher};
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
* for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
* {@link #messageSourceToFlux(MessageSource)}.
* @param messageChannel the {@link MessageChannel} to adapt.
* @param <T> the expected payload type.
* @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
*/
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
然后您可以使用IntegrationFlows.from(Publisher)
:
/**
* Populate a {@link FluxMessageChannel} to the {@link IntegrationFlowBuilder} chain
* and subscribe it to the provided {@link Publisher}.
* @param publisher the {@link Publisher} to subscribe to.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?
我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:
如何应用背压来限制生产比并行运行的更多的项目? 为了说明起见,这里有一个快速twitter用户名生成器、一个慢速twitter查找调用、一个慢速twitter文件编写器和一个打印方法。 最终目标是并行运行twitter查找,同时对生成器施加反压力,使其不会发出超出可处理范围的用户名(预计会有一些预取)。 这很好地在一个单独的线程上生成了5个twitter用户名 不确定它是正确的,但我的理由是,从一
我正在尝试将SpringReactor与我的SpringBoot应用程序一起使用。 我正在使用ProjectReactor 3.0.7。松开并安装Spring护套1.5.3。释放 在我的服务类中有一个返回通量的方法。 我想将该值返回到web层中的控制器。但是,我没有看到json响应中返回的值。 当我从浏览器调用http://localhost:8080时,我得到的响应是,{“预取”:-1} 我不确
我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?
Spring integration提供了非反应的入站/出站WebSocket适配器,简单地说,它通过内部容器将会话与id相关联,您对消息进行一些处理,在出站时,它检查消息头是否有会话id,并通过该会话发送。 现在,Spring通过org.springframework.web.reactive.socket.WebSocketSession和其他类提供了反应性WebSocket支持,我想知道在反