当前位置: 首页 > 知识库问答 >
问题:

Spring Integration-“当前组件”(routerFlow)是单向的“MessageHandler”,不适合配置“输出通道”

广昊昊
2023-03-14

我实现了如下所示的RecipientListRouter-

    @Bean
    public RecipientListRouter routerFlow() {
        RecipientListRouter router = new RecipientListRouter();
        router.setIgnoreSendFailures(true);
        router.setApplySequence(true);
        router.addRecipient("channelChkn", "headers.get('eventSubType').contains('CHKN')");
        router.addRecipient("channelBkd", "headers.get('eventSubType').contains('BKD')");
        router.addRecipient("channelBrd", "headers.get('eventSubType').contains('BRD')");
        router.addRecipient("channelAciRecCncl", "headers.get('eventSubType').contains('ACI-REC-CNCL')");
        router.addRecipient("channelSeatAsgn", "headers.get('eventSubType').contains('SEATNBR-')");
        router.addRecipient("channelDeboard", "headers.get('isDeBoarded') == true");
        router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
        LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
        return router;
    }

    @Bean
    public IntegrationFlow baseEventFlow() {

        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
                .filter(filterMessage, "rejectPastData")
                .transform(aciMessageTransformer, "parserXMLMessage")
                .route(routerFlow())
                // executor used to parallelise the multiple subscribe execution
                .publishSubscribeChannel(Executors.newCachedThreadPool(),
                        pubsub -> pubsub.subscribe(flow -> flow.channel("pubCountEvntChannel"))
                                .subscribe(flow -> flow.channel("pubTravlerEventChannel")))
                .get();

    }
router.addRecipient("pubSubChannel");

###编辑

    @Bean
    public IntegrationFlow baseEventFlow() {

        return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
                .filter(filterMessage, "rejectPastData")
                .transform(aciMessageTransformer, "parserXMLMessage")
                .gateway(eventFlow())
                .handle (test())
                .get()
   }

    @Bean
    public IntegrationFlow eventFlow() {
        return f -> f.route(routerFlow());
    }

    @Bean
    public RecipientListRouter routerFlow() {
        RecipientListRouter router = new RecipientListRouter();
        router.setIgnoreSendFailures(true);
        router.setApplySequence(true);
        router.addRecipient("channelChkn", "headers.get('eventSubType').contains('CHKN')");
        router.addRecipient("channelBkd", "headers.get('eventSubType').contains('BKD')");
        router.addRecipient("channelBrd", "headers.get('eventSubType').contains('BRD')");
        router.addRecipient("channelAciRecCncl", "headers.get('eventSubType').contains('ACI-REC-CNCL')");
        router.addRecipient("channelSeatAsgn", "headers.get('eventSubType').contains('SEATNBR-')");
        // router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-ASSIGN')");
        // router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-CHG')");
        router.addRecipient("channelDeboard", "headers.get('isDeBoarded') == true");
        //router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
        LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
        return router;
    }

共有1个答案

国胤
2023-03-14

您不能在路由器之后继续流。

如果路由器下游的各种流返回一个结果,则必须使用.gateway(...)元素调用路由器;结果将返回到主流。

 类似资料:
  • 我已经查看了Spring Integration and DSL Upgrade-One-way“MessageHandler”,配置“Output Channel”错误是不合适的,在我看来,我的解决方案的形状就在这里,但解决方案的表达方式对我来说是没有意义的。 spring-integration并行分裂路由聚合流失败,原因是单向MessageHandler更容易理解,但我没有看到处理程序方法(

  • 我们正在使用kafka来实现一个驱动事件应用程序,并在组内的消费者之间进行大量的再平衡。 CommitFailedException提交无法完成,因为组已重新平衡并将分区分配给另一个成员 我想尝试一下,但我不知道如何在代理中配置group.max.session.timeout.ms。 我还发现:

  • 现在我想编写一个类似这样的规则来生成这些文件: 但是SnakeMake要求两个输出模板都具有相同的通配符,而这些模板没有。此外,即使SnakeMake可以处理多个通配符,它也可能希望为模板找到一个与之匹配的文件名,但我希望该文件匹配与第一个模板的匹配的所有文件,即我希望是一个列表,而不是单个文件。因此,似乎这样做的方法是: 但是我不能这样做,因为lambda函数不能在输出部分中使用。 在我看来,这

  • 我正在尝试配置如下的日志,但是它抛出了这个错误。配置文件如下所示: -等等- 我收到以下错误。 我已经浏览了stackoverflow并纠正了其中的一些错误。但它仍然不起作用。请帮忙!!

  • 当我运行一个Hive语句并启动相应的MR作业时,它通常有如下一行: 数字1280804751,279261996966是什么单位?字节?街区?有没有办法将它们转换为人类可读的格式? “花费的MapReduce CPU总时间”是什么意思?“累计CPU”是什么意思?