当前位置: 首页 > 知识库问答 >
问题:

spring-integration并行分裂路由聚合流由于单向MessageHandler而失败

常鸿朗
2023-03-14

我希望通过拆分项目、将每个项目路由到相应的网关并聚合结果来并行处理项目列表。但是,我的应用程序没有启动,我得到以下错误:

BeanCreationException: The 'currentComponent' ... is a one-way 'MessageHandler' 
and it isn't appropriate to configure 'outputChannel'. 
This is the end of the integration flow.

这是一个示例流定义,它说明了行为:

@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
    return IntegrationFlows
            .from(Http.inboundGateway("/trigger"))
            .handle(message -> Arrays.asList(1, 2, 3))
            .split()
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .<Integer, Boolean>route(o -> o % 2 == 0, m -> m
                    .subFlowMapping(true, oddFlow())
                    .subFlowMapping(false, evenFlow()))
            .aggregate()
            .get();
}

@Bean
public IntegrationFlow oddFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}

@Bean
public IntegrationFlow evenFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "even");
}

我看到Error'是spring-integration aggregator DSL的单向'MessageHandler',但是这里的解决方案不适用,我没有登录handle()方法。我还试图将。DefaultOutputToparentFlow()添加到mappingDefinition中,因为cafe示例使用它,但这也没有什么不同。

共有1个答案

柴砚文
2023-03-14

你的问题在这里:

.handle(message -> Arrays.asList(1, 2, 3))

如果要使用内联实现,其外观如下所示:

.handle(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            Arrays.asList(1, 2, 3);
        }
})

注意void返回类型。由于没有什么要返回,因此也没有什么要向下游发送--是单向的'MessageHandler'

 .handle((p, h) -> Arrays.asList(1, 2, 3))
.handle(new GenericHandler<Object>() {

        @Override
        public Object handle(Object p, Map<String, Object> h) {
            return Arrays.asList(1, 2, 3);
        }
})
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
    return IntegrationFlows
            .from(Http.inboundGateway("/trigger"))
            .handle((p, h) -> Arrays.asList(1, 2, 3))
            .split()
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .<Integer, Boolean>route(o -> o % 2 == 0, m -> m
                    .subFlowMapping(true, sf -> sf.gateway(oddFlow()))
                    .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate()
            .get();
}

@Bean
public IntegrationFlow oddFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}

@Bean
public IntegrationFlow evenFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "even");
}
 类似资料:
  • 我是nodejs测试的新手,使用mocha和Chai。现在,我在使用mocha测试API路由处理程序时遇到了问题。我的路由处理程序代码是 Upload函数只是一个multer实用程序,用于存储imagefile。我的测试代码是 运行此代码后显示的输出为

  • 我想对3条路由使用Apache Camel并行组播,聚合(并等待)其中的2条路由,而让第3条路由自行进行(第3条路由不应阻塞前两条路由)。我还需要在“所有”情况下处理这两个,这意味着如果其中一个失败(例如在处理过程中抛出异常),也应该对其进行聚合。 根据我从Camel文档中了解到的情况,只要不指定StoponException,该行为就应该是“默认的”。但发生的情况是exchange异常永远不会到

  • 通过Group可以实现路由分组,Group 路由分组可以简化你的路由撰写: 有两种方法来使用Group: 第一种,创建Group对象,通过Group方法传入 g := tango.NewGroup() g.Get("/1", func() string { return "/1" }) g.Post("/2", func() string { return "/2" }) o :=

  • 我正在尝试在最新版本的 Spring Cloud 流中使用基于内容的路由。根据这份文件 - 这是我用StreamListener编写的代码 通过使用该条件,可以将消息路由到两个不同的函数。 我正试图用如下的功能接口方法来消费消息。 如何在函数中实现类似的基于内容的路由?蒂亚。 其他细节- Spring引导版本 - 2.3.12.发布 Spring云版 - Hoxton.SR11

  • 路由执行体 Tango 支持 5 种形式的函数或结构体方法作为执行体: func() func(http.ResponseWriter, *http.Request) func(*tango.Context) func(http.Response.Writer) func(*http.Request) struct.Get() func() t := tango.Classic() t.Get("

  • 我已经查看了Spring Integration and DSL Upgrade-One-way“MessageHandler”,配置“Output Channel”错误是不合适的,在我看来,我的解决方案的形状就在这里,但解决方案的表达方式对我来说是没有意义的。 spring-integration并行分裂路由聚合流失败,原因是单向MessageHandler更容易理解,但我没有看到处理程序方法(