我尝试在Spring集成中访问flux对象,而不将流声明拆分为两个函数。我想知道如何执行以下操作:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(somePublisher)
// Access publisher here to perform something like:
.handle(flux -> flux.buffer(Duration.ofMillis(200))
.handle(writeToS3)
.get();
}
我不介意将我在评论中提到的通量操作转移到另一个类(可能是为了作为某种网关),但对我来说,从同一个mainFlow
函数启动和流显然非常重要,因此它将非常清晰易读,能够理解我在应用程序中所做的事情。我看到了Monos网关的文档,但示例代码甚至不可能(它们讨论的是函数中没有的通量,作为初学者,我很难理解那里发生了什么)。
IntegrationFlows.from(某个Publisher)
为提供的Publisher
启动一个ractive流。流的其余部分针对源Publisher
中的每一个事件执行。因此,您的。句柄(通量-
如果您的想法是为源
并继续下去,那么请考虑使用reactive()自定义程序:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#fluxmessagechannel-and-reactivestreamsconsumer.Publisher
应用缓冲区()
因此,我将使用以下方法代替handle()
:
.bridge(e -> e.reactive(flux -> flux.buffer(Duration.ofMillis(200)))
下一个句柄(WriteToS3)
将针对缓冲的List
结果执行,因此要小心在WriteToS3
中执行和期望的操作。
我试图理解在Spring集成中聚合时返回的类型,这相当困难。我正在使用Project Reactor,我的代码片段是: 除了理解示例中传递的类型之外,我还想知道如何才能知道中流动的对象及其类型。
如何在下面的JUnit类中运行integrationFlow?目前出现了例外情况 因为整合流没有启动。 JUnit类: }
我需要一个http入站流通道,类似于ftp流适配器通道(http://docs.spring.io/spring-integration/docs/4.3.9.RELEASE/reference/html/ftp.html#ftp-流媒体)但我找不到,SI支持吗?如果不是,是否有可能解决问题? 我需要从http流通道接收soap消息,使用SAX转换消息,然后将其发送到http出站流通道
我使用的是带有spring Integration 5.3.0的spring boot 2.3.0版本,不知何故,我无法让下面的代码正常工作。应用程序启动,没有错误,但当控制到方法时,什么也没有发生或打印。谁能告诉我我错过了什么。任何帮助都很感激。谢了。
我不是很理解,因为我可以更改所有集成流的错误通道。我需要处理像InvalidAccessTokenException这样的异常,它们可以在路由器内部的子流中抛出。 我所尝试的是通过以下方式处理来自默认通道“ErrorChannel”的异常: 此错误由具有以下签名的方法处理: 集成流的配置我解释如下: 可以在任何社交网络服务中启动的异常示例如下: 是否可能将此异常绑定到特定的错误通道?。 以下是日志
在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction