当前位置: 首页 > 知识库问答 >
问题:

如何纠正“Spring integration是单向的'MessageHandler',不适合配置'Output Channel'”异常?

郏景澄
2023-03-14

我已经查看了Spring Integration and DSL Upgrade-One-way“MessageHandler”,配置“Output Channel”错误是不合适的,在我看来,我的解决方案的形状就在这里,但解决方案的表达方式对我来说是没有意义的。

spring-integration并行分裂路由聚合流失败,原因是单向MessageHandler更容易理解,但我没有看到处理程序方法(无论它是什么)在哪里返回了一个空白,从而导致了这个打嗝。

异常Spring integration是一个单向的“MessageHandler”,不适合为.aggregate()工厂方法设置“output channel”。队列是基于应用程序收集的元数据在运行时动态构造的。

根据对框架的调试,看起来.routeToreCipients(r->this.buildrecipientlistrouterspecforrules(r,rules))子句返回一个void。向recipientListRouter添加.DefaultOutputToparentFlow()可以消除异常,但可能不是正确的解决方案,因为当我进行此调整时,流实际上并没有启动。

我欢迎任何建议。

代码片段:

        StandardIntegrationFlow flow = IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                    .headerExpression("yyy", "payload[0].get(\"yyy\")")
                    )
            .split(tableSplitter)
            .enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
                    .headerExpression("bbb", "payload[0].get(\"bbb\")")
                    )
            .channel(c -> c.executor(stepTaskExecutor))
            .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
            .aggregate()
            .handle(cleanupAdapter).get();

    return flow;



private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules) {
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));

    return recipientListSpec;
}

编辑:基于以下讨论的解决方案的修订代码:

        StandardIntegrationFlow flow = IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                    .headerExpression("yyy", "payload[0].get(\"yyy\")")
                    )
            .gateway(new DirectChannel())
            .split(tableSplitter)
            .enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
                    .headerExpression("bbb", "payload[0].get(\"bbb\")")
                    )
            .channel(c -> c.executor(stepTaskExecutor))
            .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules).defaultOutputToParentFlow())
            .aggregate()
            .handle(cleanupAdapter).get();

共有1个答案

慕高阳
2023-03-14

不能在routeToreCipients()之后继续该流程。我的意思是您的.Aggrege()导致了该错误,因为RecipientListRouter不是AbstractReplyProductingMessageHandler,而只是普通的AbstractMessageHandler,并且它只根据为该路由器类型提供的映射知道在路由函数之后将消息发送到哪里。

有关更多信息,请参见DefaultOutputToparentFlow()JavaDocs:

/**
 * Make a default output mapping of the router to the parent flow.
 * Use the next, after router, parent flow {@link MessageChannel} as a
 * {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
 * @return the router spec.
 */
public S defaultOutputToParentFlow() {

因此,只有当它与收件人映射不匹配时,它才会继续您的流。还必须设置IgnorresendFailures(true),以使其不会出现错误而失败,而是返回到默认输出。

 类似资料:
  • 纠删码由配置定义,在创建纠删码存储池及其相关的 CRUSH 规则集时用到。 创建 Ceph 集群时初始化的、名为 default 的纠删码配置可提供与两副本相同的冗余水平,却能节省 25% 的磁盘空间。在此配置中 k=2 和 m=1 ,其含义为数据分布于 3 个 OSD ( k+m==3 )且允许一个失效。 为了在不增加原始存储空间需求的前提下提升冗余性,你可以新建配置。例如,一个 k=10 且

  • 本文向大家介绍拼写纠错是如何实现的?相关面试题,主要包含被问及拼写纠错是如何实现的?时的应答技巧和注意事项,需要的朋友参考一下   1、拼写纠错是基于编辑距离来实现;编辑距离是一种标准的方法,它用来表示经过插入、删除和替换操作从一个字符串转换到另外一个字符串的最小操作步数;   2、编辑距离的计算过程:比如要计算 batyu 和 beauty 的编辑距离,先创建一个7×8 的表(batyu 长度为

  • 我希望通过拆分项目、将每个项目路由到相应的网关并聚合结果来并行处理项目列表。但是,我的应用程序没有启动,我得到以下错误: 这是一个示例流定义,它说明了行为: 我看到Error'是spring-integration aggregator DSL的单向'MessageHandler',但是这里的解决方案不适用,我没有登录handle()方法。我还试图将。DefaultOutputToparentFl

  • 我试图完成本教程,但我认为在配置网页时遇到了一些问题。我正在一步一步地制作教程,但它不起作用。。 错误]无法执行目标com。github。eirslett:frontend maven插件:1.10.3:project shop上的webpack(webpack生成):无法运行任务:“webpack”。js'失败了。组织。阿帕奇。平民执行官。ExecuteException:进程已退出,但出现错误