我有一个SI DSL流,就像
@Bean
public IntegrationFlow processRequest() {
return flow -> flow.channel(REQUEST_INPUT)
.transform(...)
.enrichHeaders(h -> h.<Storage>headerFunction(...)))
.enrich(this::...)
//.route(ifReplayIsNeeded(), routeToSameFlow())
.enrich(this::...)
.route(..., ...)
.route(..., ...)
.enrich(this::...)
.handle(...)
//.route(ifReplayWasNeeded(), routeBack())
.route(..., ...)
.enrich(this::...)
.transform(...)
.channel(REQUEST_OUTPUT);
}
因此,当满足一个条件时(请参见ifreplayineeded()),则必须再次调用processRequest()流。然而,并非必须执行整个流程,而是几乎在最后执行(-
routeToSameFlow()
看起来像(Storage
用于存储请求/响应和流中使用的其他数据)
Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeToSameFlow() {
return rs -> rs.resolutionRequired(false)
.subFlowMapping(true, sf -> sf
// 1. storing the current Storage
.enrichHeaders(h -> h.<Storage>headerFunction("store", s -> s))
// 2. creating the req for the internal flow
.transform(internalRequestMapper, "mapFrom")
// 3. routing to the beginning of the flow
.route(Message.class, (m) -> REQUEST_INPUT)
// 4. defining the channel where the internal flow will return to
.channel("RETURN_CHANNEL")
)
.defaultOutputToParentFlow();
}
和routeBack()
Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeBack() {
return rs -> rs.resolutionRequired(false)
.subFlowMapping(true, sf ->
sf.route(Message.class, (m) -> "RETURN_CHANNEL")
)
.defaultOutputToParentFlow();
}
我肯定错过了一些概念,因为我得到了以下错误:
原因:org.springframework.beans.factory.BeanCreationException:“当前组件”(org.springframework.integration.router.MethodInvokingRouter@60a0f09f)是单向的“MessageHandler”,不适合配置“outputChannel”。这是集成流程的结束。
你能帮我实现这样的逻辑吗?我应该把主流分成更小的集成流吗<我希望尽可能少地干扰主流,这就是为什么我只想在开始时添加分流路线,在结束时添加回流路线的原因。有可能做到吗?
谢谢你<问候,
V。
您可能会使所有这些路由器的逻辑过于复杂。考虑在您的主流中间只有简单的Channel()
定义,并作为您想要返回的子流的输出。JavaDSL中没有限制,只是在开头和结尾都有Channel()
定义:您可以在流定义的任何点添加Channel()
。例如:
.handle(...)
.channel("middleOfTheFlow")
.route(ifReplayWasNeeded(), routeBack())
然后您的routeBack()
流定义最终可能只有. Channel("middleOfTheFlow")
,此处处理后的消息将被传递到主流的middleOfTheFlow
通道。
换句话说,一个集成流(IntegrationFlow)只是将一些业务功能的endpoint保持在同一位置的逻辑组件,但这并不意味着流中的所有endpoint都非常严格地相互关联。您始终可以在中间有一个通道,并从其他地方向该通道发送消息,订阅该通道的endpoint将处理该消息。
查看文档中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-数字用户线
我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。
当msg处理抛出异常时,如何有效地支持JMS重新交付? 我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。 我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。 对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新
我是Java新手,正在研究一些技术,我想知道是否有可能集成JSF、Spring和PrimeFaces。我正在寻找一些提示,但我只找到了JSF+Spring或Spring+Primefaces或Spring+JSF或JSF+Primefaces,但从来没有同时找到所有3个。有可能把它们都整合在一起? Att, 佩德罗·恩里克
我是spring集成和缓存新手,想知道如何将从出站网关接收到的对象添加到缓存中。无法确定所需的配置。 从以下配置,我从rest api收到的对象正在被记录: INFO:com.domain.IpAddress@74589991 我计划使用ehcache/caffiene,任何提示都会有帮助。 编辑2: 现在,我按照建议更改了出站网关: 并将ehache配置定义如下: 在我的服务类中,定义了可缓存的
Spring提供了特殊的类DelegatingVariableResolver,以无缝方式将JSF和Spring集成在一起。 在JSF中集成Spring依赖注入(IOC)功能需要以下步骤。 第1步:添加DelegatingVariableResolver 在faces-config.xml中添加一个variable-resolver条目,指向spring类DelegatingVariableRes
我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI