当前位置: 首页 > 知识库问答 >
问题:

如何在spring集成中并行同步处理?

公西光华
2023-03-14

在Spring集成中是否可以保持通道同步(发送消息后获得确认)但同时处理更多消息(并行处理)而无需使用线程创建自己的代码(即ExecutorService执行和提交工作人员)并等待它们?我想通过FTP上传文件,但同时上传更多文件,而无需在代码中创建自己的线程。我需要知道所有文件何时上传(这就是我希望它是同步的原因)。是否可以通过Spring集成配置,如果可以,如何?

共有2个答案

蒋嘉实
2023-03-14

在Spring中,通过使用异步任务处理,这是非常可能的。

首先创建一个将异步执行任务的服务。这里请注意performTask方法上的Async注释,它将被spring扫描并标记为异步执行。

import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
public class AsyncTask {

    @Async
    public Future<Result> performTask(String someArgument) {
        // put the business logic here and collect the result below
        Result result = new Result(); // this is some custom bean holding your result
        return new AsyncResult<Result>(result);
    }
}

接下来创建一个组件(可选-也可以来自任何其他现有服务),该组件将调用上述服务。

import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class AsyncClass {

    @Autowired
    private AsyncTask asyncTask;

    public void doAsyncOperation() throws Exception {

    List<Future<Result>> futures = new ArrayList<Future<Result>>();

    for (int i = 1; i < 10; i++) {
        // Simulate multiple calls
        Future<Result > future = doAsync(String.valueOf(i));            
        futures.add(future);
    }

    for (Future<Result > future : futures) {
            // fetch the result
            Result result = future.get();
            // process the result
    }
}

    private Future<Result> doAsync(final String someArgument) {

        // this will immediately return with a placeholder Future object which
        // can be used later to fetch the result
        Future<Result> future = asyncTask.performAsync(someArgument);
        return future;
    }
}

启用异步所需的xml配置示例如下(对于基于注释的配置,请使用@EnableAsync)

<task:annotation-driven executor="myExecutor" />
<task:executor id="myExecutor" pool-size="30" rejection-policy="CALLER_RUNS"/>

有关详细文档,请参阅此处

宗沛
2023-03-14

看起来你需要一些流,比如:

>

  • <代码>

    <代码>

    <代码>

    <代码>

    <代码>

    如果有什么不清楚,请告诉我。

    更新

    如何在Spring集成Java DSL中做到这一点?有什么例子吗?

    像这样的东西:

    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    public class Configuration {
    
        @Bean
        public IntegrationFlow uploadFiles() {
            return f ->
                       f.split()
                           .handle(Ftp.outboundGateway(this.ftpSessionFactory,
                               AbstractRemoteFileOutboundGateway.Command.PUT, "'remoteDirectory'"))
                           .aggregate();
        }
    
    }
    
    @MessagingGateway(defaultRequestChannel = "uploadFiles.input") 
    interface FtpUploadGateway {
    
        List<String> upload(List<File> filesToUpload);
    
    }
    

  •  类似资料:
    • 在Spring integration中,我必须处理动态通道创建,但当我调试应用程序时,我看到不同通道之间的“阻塞”问题。 我知道是一个公共通道,在父上下文中共享,但如何为每个子上下文开发一个完整的独立场景?。公共网关是问题所在吗? 我在Spring integration flow async中看到了post错误处理,但对于每个子级,我都有一个完整的分离环境,我希望利用这些动态分离的优势。这可能

    • 如何在下面的JUnit类中运行integrationFlow?目前出现了例外情况 因为整合流没有启动。 JUnit类: }

    • 我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫

    • Spring批处理-需要帮助以并行和多个节点运行批处理作业的独立步骤。一个spring批处理作业(JobA),包含三个步骤[步骤A(在compute1中)和步骤B(在compute2中)以及步骤C] StepA和StepB是独立的步骤,占用大量内存,因此不能在同一计算节点/JVM上并行运行。要使StepC同时启动(StepA和StepB),需要成功完成。我不想为了节省时间而依次执行步骤A和步骤B。

    • null 如何在transform()步骤中添加Jaxb2Marshaller?

    • 问题内容: 我具有以下Spring Integration配置,该配置允许我从MVC Controller调用网关方法并让控制器返回,而集成流将在不阻塞控制器的单独线程中继续进行。 但是,我无法弄清楚如何使我的错误处理程序为该异步流工作。我的网关定义了错误通道,但是由于某种原因我的异常没有到达。相反,我看到被调用了。 网关: 为了查看我的错误处理程序正在处理的异步集成流程中发生的异常,我该怎么办?