当前位置: 首页 > 知识库问答 >
问题:

Executor服务上的异步进程

申屠飞
2023-03-14

我正在使用多线程执行器服务。跨15个线程,当前进程在转移到下一个执行器服务之前完成了15个线程,下一个执行器服务再次跨15个线程。我正在寻找代码帮助,如果步骤1执行器服务中的任何当前线程完成处理,我需要转到下一个执行器服务,它可以开始使用线程,而不是在步骤1中等待完成15个线程来完成。

我想利用完成的线程,然后转到步骤2,即使其他线程在步骤1中执行,只要步骤1中的每个线程完成,步骤2就可以抓取并继续处理。建议请

// Step 1 
ExecutorService executor1 = Executors.newFixedThreadPool(15);
for (int i=0;i<=15;i++) { 
    Runnable worker = new Runnable("Step 1 Insert"); 
    executor1.execute(worker); } 
executor1.shutdown(); 
// Step 2 
ExecutorService executor2 = Executors.newFixedThreadPool(15);
for (int i=0;i<=15;i++) { 
    Runnable worker = new Runnable("Step 2 Insert"); 
    executor2.execute(worker); } 
executor2.shutdown();

共有3个答案

左丘嘉言
2023-03-14

您可以使用ExecutorCompletionService。我稍微修改了JavaDoc中的示例

ExecutorService executor1 = Executors.newFixedThreadPool(15);
ExecutorService executor2 = Executors.newFixedThreadPool(15);

....

CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor1);
for (Callable<Result> s : solvers)
    ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
    Result r = ecs.take().get();
    if (r != null)
        executor2.submit(r);
}
executor1.shutdown();
//and shutdown executor2 when you don't need it

Result类只是一个例子,我假设它实现了CallableRunnable,因此它可以提交到第二个执行器中。

您还可以将ExecutorService的实例初始化为new ThreadPoolExecutor(0,15,10,TimeUnit.SECONDS,new LinkedBlockingQueue())以终止现在不需要的线程,但您希望以后重用此执行器。

或者你可以简单地使用一个执行者。从ExecutorCompletionService收到已完成的任务后,将其(或其结果)重新提交给同一执行者(感谢@Fildor)。但在这种情况下,您需要确定第一次完成任务的时间。

for (int i = 0; i < n;) {
    Result r = ecs.take().get();
    if (r != null && isCompletedFirstTime(r))//some user defined function
        executor1.submit(r);
    else 
        ++i;//we need to know when to stop, otherwise we'll stuck in `take()`
}
韦嘉颖
2023-03-14

@SpiderPig的答案是IMHO的一个很好的解决方案,但如果您想要解耦步骤,我想给出一个替代方案:

使用一个执行器,在你的情况下,需求似乎是一个有15个线程的固定线程池。

接下来是定义Step-Runnable,就像这样:

class StepX implements Runnable{
    private final State _state; // Reference to the data to work on.
    StepX( State state ){
        _state = state;
    }

    public void run(){ 
        // work on _state
        executor.submit( new StepXplusOne( _state ) ); // reference executor in outer class and schedule next step.
    }
} 

您可以看到我使用了一个State对象,它保存执行步骤和收集结果所需的所有数据。当然,您需要将StepX定义为Step1,Step2,...

在外类中,您只需提交N个Step1-Runnable,它将只使用您的15个线程并完成步骤。

我遗漏了一个方法,当所有步骤都完成时发出信号,因为有很多可能性可以做到这一点,我相信你可以自己选择一个。

鲁城
2023-03-14

为什么不能在同一个Runnable中执行所有步骤?例如

ExecutorService executor = Executors.newFixedThreadPool(15);
for (int i=0;i<=15;i++) { 
    Runnable worker = new Runnable() {
        public void run() {
            doStep1();
            doStep2();
            doStep3();
            ...
        }
    }; 
    executor.execute(worker);
}
executor.shutdown();
 类似资料:
  • 异步Tcp客户端 异步Http客户端 异步Redis客户端 异步Mysql客户端 异步Log日志 异步文件读写 异常Exception

  • 异步Log日志 use AsyncLog; yield AsyncLog::info('hello world'); yield AsyncLog::debug('test debug', ['foo' => 'bar']); yield AsyncLog::notice('hello world',[], 'group.com'); yield Async

  • 我正在开发一个应用程序,不断地从Kafka主题中读取数据。这些数据是字符串格式的,然后我将其写入xml文件 要编写这些文件,我使用执行服务。 我想知道什么时候应该关闭我的执行服务。如果我的应用程序是有时间限制的,我会在executor实例上使用它,但我的应用程序应该持续运行。 如果发生任何异常,我的整个应用程序被杀死,它会自动关闭我的执行者吗?或者我应该捕获一个未检查的异常并关闭我的执行器,就像我

  • 异常Exception 以传统的try,catch抓取异常 如果在业务层不catch,框架层会捕捉,并返回一个500的server error响应。 如果在开发环境会返回一个500的具体错误的trace响应。 try { throw new \Exception("Error Processing Request", 1); //yield throwExc

  • 异步文件读写 读文件 use AsyncFile; $content = (yield AsyncFile::read(__ROOT__."runtime/test.txt")); 写文件 $res = (yield AsyncFile::write(__ROOT__."runtime/test.txt", "hello wordls!")); $res = (yi

  • 异步Mysql客户端 AsyncMysql::query($sql, $usePool = true) 第二个参数设为false将不会使用连接池中的资源,默认都会从连接池中取,配置连接池数量 => config/database.php 具体使用 use AsyncMysql; //设置超时时间 AsyncMysql::setTimeout(2); $res = (

  • 异步Redis客户端 连接池(连接池默认开启) use AsyncRedis; //关闭连接池 AsyncRedis::enablePool(false); //开启连接池 AsyncRedis::enablePool(true); 使用AsyncRedis use AsyncRedis; //设置超时时间 AsyncRedis::s

  • 异步Http客户端 Get方式 1.使用域名形式 use AsyncHttp; //直接使用域名, get方式 $http = new AsyncHttp('http://groupco.com'); //设置2s超时 $http->setTimeout(2); //$http->setCookies(['token' => 'xxxx']);