当前位置: 首页 > 知识库问答 >
问题:

我的Spring集成流的一些问题

越狐若
2023-03-14

我使用的是带有spring Integration 5.3.0的spring boot 2.3.0版本,不知何故,我无法让下面的代码正常工作。应用程序启动,没有错误,但当控制到NextChannelFlow()方法时,什么也没有发生或打印。谁能告诉我我错过了什么。任何帮助都很感激。谢了。

@Configuration
@EnableIntegration
@EnableRetry
    Class MyIntegrationFlow
    .
    .

    @Bean
    public IntegrationFlow upstreamFlow(){
    return IntegrationFlows.from(someChannel())
    .handle(srcDirectory(), "receive")                               
    .filter(onlyCsvfiles())                    
    .handle(targetDirectory())// returns a FileWritingMessageHandler
    .channel(nextChannel())     //this is downstream                            
    .get();
    }

@Bean
    public MessageHandler targetDirectory() throws IOException {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(targetFolder.getFile());
        handler.setExpectReply(false);
        return handler;
    }   

    @Bean
    public DirectChannel nextChannel(){
    return new DirectChannel();
    }

    @Bean
    public IntegrationFlow nextChannelFlow() {
//the below is that last line that gets printed in console. After this line, nothing gets printed and I see no errors too.
        System.out.println("inside nextChannel method ");

        return IntegrationFlows.from (nextChannel()) 
                .handle((GenericHandler<String>) (payload, headers) -> {
                    System.out.println("inside handle 1");
                    callSomeMethod();
                    System.out.println("inside handle 2");
                    return payload;                 
                })
                .log(Level.INFO, new LiteralExpression("came out of handle.")) //was trying to see if I can see any logs/errors at this line , but nothing displayed.
                .channel(checkXFBFlowChannel())//control doesn't go to this channel as well.
                .get();
    }   

    @Retryable(value={IllegalStateException.class}, maxAttempts=5,backoff=@Backoff(delay=3000))
    public void callSomeMethod() {
        System.out.println("simulate sample retry method");        
        throw new IllegalStateException() ;                  
    }

共有1个答案

公良英资
2023-03-14
.handle(returns MessageHandler)

如果它是一个普通的MessageHandler,那么在它之后就不会发生任何事情。它不返回任何东西(void),因此在此之后没有回复继续流。请考虑使用句柄(GenericHandler)

 类似资料:
  • 我有一个关于JMS和Spring集成的问题。 我有3个队列,让我们称它们为QUEUE_SOURCE、QUEUE_TARGET和QUEUE_ERROR。DefaultMessageListenerContainer用于从队列源读取消息。 我已经为这些队列配置了JMS事务管理器。 当我从QUEUE_源读取消息,但将消息发布到QUEUE_目标时出错时,我可以看到在抛出异常之前,消息会重试几次,从而触发回

  • 我尝试在Spring集成中访问flux对象,而不将流声明拆分为两个函数。我想知道如何执行以下操作: 我不介意将我在评论中提到的通量操作转移到另一个类(可能是为了作为某种网关),但对我来说,从同一个函数启动和流显然非常重要,因此它将非常清晰易读,能够理解我在应用程序中所做的事情。我看到了Monos网关的文档,但示例代码甚至不可能(它们讨论的是函数中没有的通量,作为初学者,我很难理解那里发生了什么)。

  • 我试图理解在Spring集成中聚合时返回的类型,这相当困难。我正在使用Project Reactor,我的代码片段是: 除了理解示例中传递的类型之外,我还想知道如何才能知道中流动的对象及其类型。

  • 我有一个SI DSL流,就像 因此,当满足一个条件时(请参见ifreplayineeded()),则必须再次调用processRequest()流。然而,并非必须执行整个流程,而是几乎在最后执行(- 看起来像(用于存储请求/响应和流中使用的其他数据) 和routeBack() 我肯定错过了一些概念,因为我得到了以下错误: 原因:org.springframework.beans.factory.B

  • 我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI

  • 我正在尝试将Spring Security性与LDAP集成。使用spring core版本4.0.5,spring security版本3.2.2和spring ldap版本1.3.2。这是我的安全配置xml