当前位置: 首页 > 知识库问答 >
问题:

Spring集成-重播相同的流

梁浩涆
2023-03-14

我有一个SI DSL流,就像

@Bean
public IntegrationFlow processRequest() {
    return flow -> flow.channel(REQUEST_INPUT)
            .transform(...)
            .enrichHeaders(h -> h.<Storage>headerFunction(...)))
            .enrich(this::...)

            //.route(ifReplayIsNeeded(), routeToSameFlow())

            .enrich(this::...)
            .route(..., ...)
            .route(..., ...)
            .enrich(this::...)
            .handle(...)

            //.route(ifReplayWasNeeded(), routeBack())

            .route(..., ...)
            .enrich(this::...)
            .transform(...)
            .channel(REQUEST_OUTPUT);
}

因此,当满足一个条件时(请参见ifreplayineeded()),则必须再次调用processRequest()流。然而,并非必须执行整个流程,而是几乎在最后执行(-

routeToSameFlow()看起来像(Storage用于存储请求/响应和流中使用的其他数据)

Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeToSameFlow() {
    return rs -> rs.resolutionRequired(false)
            .subFlowMapping(true, sf -> sf
                    // 1. storing the current Storage
                    .enrichHeaders(h -> h.<Storage>headerFunction("store", s -> s))
                    // 2. creating the req for the internal flow
                    .transform(internalRequestMapper, "mapFrom")
                    // 3. routing to the beginning of the flow
                    .route(Message.class, (m) -> REQUEST_INPUT)
                    // 4. defining the channel where the internal flow will return to
                    .channel("RETURN_CHANNEL")
            )
            .defaultOutputToParentFlow();
}

和routeBack()

Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeBack() {
    return rs -> rs.resolutionRequired(false)
            .subFlowMapping(true, sf ->
                    sf.route(Message.class, (m) -> "RETURN_CHANNEL")
            )
            .defaultOutputToParentFlow();
}

我肯定错过了一些概念,因为我得到了以下错误:

原因:org.springframework.beans.factory.BeanCreationException:“当前组件”(org.springframework.integration.router.MethodInvokingRouter@60a0f09f)是单向的“MessageHandler”,不适合配置“outputChannel”。这是集成流程的结束。

你能帮我实现这样的逻辑吗?我应该把主流分成更小的集成流吗<我希望尽可能少地干扰主流,这就是为什么我只想在开始时添加分流路线,在结束时添加回流路线的原因。有可能做到吗?

谢谢你<问候,
V。

共有1个答案

林和煦
2023-03-14

您可能会使所有这些路由器的逻辑过于复杂。考虑在您的主流中间只有简单的Channel()定义,并作为您想要返回的子流的输出。JavaDSL中没有限制,只是在开头和结尾都有Channel()定义:您可以在流定义的任何点添加Channel()。例如:

 .handle(...)
 .channel("middleOfTheFlow")
 .route(ifReplayWasNeeded(), routeBack())

然后您的routeBack()定义最终可能只有. Channel("middleOfTheFlow"),此处处理后的消息将被传递到主流的middleOfTheFlow通道。

换句话说,一个集成流(IntegrationFlow)只是将一些业务功能的endpoint保持在同一位置的逻辑组件,但这并不意味着流中的所有endpoint都非常严格地相互关联。您始终可以在中间有一个通道,并从其他地方向该通道发送消息,订阅该通道的endpoint将处理该消息。

查看文档中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-数字用户线

 类似资料:
  • 我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。

  • 当msg处理抛出异常时,如何有效地支持JMS重新交付? 我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。 我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。 对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新

  • 我是Java新手,正在研究一些技术,我想知道是否有可能集成JSF、Spring和PrimeFaces。我正在寻找一些提示,但我只找到了JSF+Spring或Spring+Primefaces或Spring+JSF或JSF+Primefaces,但从来没有同时找到所有3个。有可能把它们都整合在一起? Att, 佩德罗·恩里克

  • 我是spring集成和缓存新手,想知道如何将从出站网关接收到的对象添加到缓存中。无法确定所需的配置。 从以下配置,我从rest api收到的对象正在被记录: INFO:com.domain.IpAddress@74589991 我计划使用ehcache/caffiene,任何提示都会有帮助。 编辑2: 现在,我按照建议更改了出站网关: 并将ehache配置定义如下: 在我的服务类中,定义了可缓存的

  • Spring提供了特殊的类DelegatingVariableResolver,以无缝方式将JSF和Spring集成在一起。 在JSF中集成Spring依赖注入(IOC)功能需要以下步骤。 第1步:添加DelegatingVariableResolver 在faces-config.xml中添加一个variable-resolver条目,指向spring类DelegatingVariableRes

  • 我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI