当前位置: 首页 > 知识库问答 >
问题:

使用ExecutorService并行处理作业

孙化
2023-03-14

我正在编写一个需要处理大量URL的java程序
每个URL将按顺序运行以下作业:下载、分析、压缩

我希望每个作业都有固定数量的线程,这样所有作业在任何给定时间都有并发运行的线程,而不是每个URL一次使用一个线程来完成所有作业。

例如,下载作业将有多个线程来获取和下载URL,一旦其中一个URL被下载,它就会将其传递给分析作业中的一个线程,一旦完成,它就会传递给压缩作业中的一个线程,等等。

我正在考虑在java中使用CompletionService,因为它一完成就返回一个结果,但我不确定它是如何工作的,到目前为止,我的代码如下所示:

ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<DownloadedItem> completionService = new ExecutorCompletionService<DownloadedItem>(executor);

//while list has URL do {
   executor.submit(new DownloadJob(list.getNextURL());//submit to queue for download
//}

//while there is URL left do {
   Future<DownloadedItem> downloadedItem = executor.take();//take the result as soon as it finish
   //what to do here??
//}

我的问题是如何将下载的项目移动到分析作业并在那里完成工作而不等待所有下载作业完成?我正在考虑为每个作业创建一个CompletionService,这是一种可行的方法吗?如果不是,是否有更好的替代方法来解决这个问题?请提供示例。

共有3个答案

阳兴文
2023-03-14

您所描述的称为Pipeline。基本上下载任务的输出是分析任务的输入。分析的输出是压缩的输入。似乎有两个选项可以完成此操作:

1) 让下载任务知道输出的管道,这样它就可以自己提交结果。

class DownloadTask implement Runnable {
    Executor analyzePipeline;
    public void run() {
        //Do download stuff
        analyzePipeline.submit(new AnalyzeTask(downloaded content));
    }
}

2) 允许另一个线程将下载任务的结果移动到分析任务的管道中。

ExecutorService executor = Executors.newFixedThreadPool(3);
ExecutorService analyzeExecutor = Executors.newFixedThreadPool(3);
CompletionService<DownloadedItem> completionService = new ExecutorCompletionService<DownloadedItem>(executor);

while list has URL do {
   executor.submit(new DownloadJob(list.getNextURL());//submit to queue for download
}

new Thread() {
    public void run() {
        while there is URL left do {
            Future<DownloadedItem> downloadedItem = executor.take();//take the result as soon as it finish
            analyzeExecutor.submit(new AnalyzeJob(downloadedItem.get());
        }
    }
};    
//...and so on
卫乐童
2023-03-14

你很接近。首先将任务提交给完成服务

completionService.submit(new DownloadJob(list.getNextURL());

现在抓住未来并等待它:

DownloadedItem> downloadedItem = executor.take().get();

调用get()可能会阻塞。重复上面的行,重复您提交的项目的次数。

如果您需要更大的吞吐量(在您的情况下,一次最多下载三个URL),请考虑async http client,它将允许您同时从数千个URL下载。它使用NIO,是事件驱动的,不涉及线程。

东门涵育
2023-03-14

一旦您提到IN ORDER,任何为有序任务使用单独线程的尝试只会使系统的设计复杂化。

在我看来,最好的办法是让不同的线程同时处理不同的URL。要完成这三个步骤,可以引入另一个抽象(比如使用3个可调用项),但仍然希望在一个线程中顺序执行它们。无需完成服务。

 类似资料:
  • 问题内容: 我有一个很大的数组,我想通过将它的片段交给一些异步任务来处理。作为概念证明,我编写了以下代码: 之后,将使用0到9之间的随机整数初始化。 该函数使用它们各自的插槽作为累加器,分派10个任务,这些任务从50000个项目的不相交的块中将它们相加并相加。 该程序在行崩溃。错误是: 我可以看到,在调试器中,它在崩溃前已经进行了几次迭代,并且在崩溃时变量具有正确范围内的值。 我读过尝试访问已发布

  • 我正在处理大量的4K图像,通过在图像的小(64x64像素)补丁上计算一个参数。这项任务现在是按顺序进行的,一个补丁一个补丁。下面复制了一段我的代码来向您展示这个想法。 结果: 顺序:15754毫秒 并行:5899 ms

  • 我已经用RxJava成功地完成了一个小型Java程序。代码为: 使用此代码,一切正常。现在我正在尝试将此代码传递给Android: 在finished()方法中,我正在更新GUI(finishedListener是当前活动正在实现的接口)。 我在使用map(I)的线路上遇到错误- 内置。gradle(用于应用程序)我正在使用: 我如何解决这个问题?

  • 我的Spring批处理作业每3分钟运行一次。 步骤应为 每个用户的记录应该并行执行。每个用户最多可以有150k条记录。 每个用户都可以有更新和删除记录。更新记录应在删除之前运行。 更新/删除集应该自己并行运行。但严格来说,所有更新都应该在删除之前完成。 有谁能提出在多个级别实现并行性的最佳方法,并遵循更新和删除级别的顺序吗。我正在研究Spring异步执行器服务、并行流和其他Spring库。Rx,仅

  • 我正在使用在每一行上执行一个函数,这需要很长时间,为了加快速度,有没有一种方法可以使用并行处理,使多个核心在不同的行上并发工作? 例如,我将PRISM天气数据(https://prism.oregonstate.edu/)聚合到州一级,同时按人口加权。这是基于https://www.patrickbaylis.com/blog/2021-08-15-pop-weighted-weather/. 请

  • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“