我正在使用多线程执行器服务。跨15个线程,当前进程在转移到下一个执行器服务之前完成了15个线程,下一个执行器服务再次跨15个线程。我正在寻找代码帮助,如果步骤1执行器服务中的任何当前线程完成处理,我需要转到下一个执行器服务,它可以开始使用线程,而不是在步骤1中等待完成15个线程来完成。
我想利用完成的线程,然后转到步骤2,即使其他线程在步骤1中执行,只要步骤1中的每个线程完成,步骤2就可以抓取并继续处理。建议请
// Step 1
ExecutorService executor1 = Executors.newFixedThreadPool(15);
for (int i=0;i<=15;i++) {
Runnable worker = new Runnable("Step 1 Insert");
executor1.execute(worker); }
executor1.shutdown();
// Step 2
ExecutorService executor2 = Executors.newFixedThreadPool(15);
for (int i=0;i<=15;i++) {
Runnable worker = new Runnable("Step 2 Insert");
executor2.execute(worker); }
executor2.shutdown();
您可以使用ExecutorCompletionService。我稍微修改了JavaDoc中的示例
ExecutorService executor1 = Executors.newFixedThreadPool(15);
ExecutorService executor2 = Executors.newFixedThreadPool(15);
....
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor1);
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
executor2.submit(r);
}
executor1.shutdown();
//and shutdown executor2 when you don't need it
Result
类只是一个例子,我假设它实现了Callable
或Runnable
,因此它可以提交到第二个执行器中。
您还可以将ExecutorService
的实例初始化为new ThreadPoolExecutor(0,15,10,TimeUnit.SECONDS,new LinkedBlockingQueue())
以终止现在不需要的线程,但您希望以后重用此执行器。
或者你可以简单地使用一个执行者。从ExecutorCompletionService
收到已完成的任务后,将其(或其结果)重新提交给同一执行者(感谢@Fildor)。但在这种情况下,您需要确定第一次完成任务的时间。
for (int i = 0; i < n;) {
Result r = ecs.take().get();
if (r != null && isCompletedFirstTime(r))//some user defined function
executor1.submit(r);
else
++i;//we need to know when to stop, otherwise we'll stuck in `take()`
}
@SpiderPig的答案是IMHO的一个很好的解决方案,但如果您想要解耦步骤,我想给出一个替代方案:
使用一个执行器,在你的情况下,需求似乎是一个有15个线程的固定线程池。
接下来是定义Step-Runnable,就像这样:
class StepX implements Runnable{
private final State _state; // Reference to the data to work on.
StepX( State state ){
_state = state;
}
public void run(){
// work on _state
executor.submit( new StepXplusOne( _state ) ); // reference executor in outer class and schedule next step.
}
}
您可以看到我使用了一个State
对象,它保存执行步骤和收集结果所需的所有数据。当然,您需要将StepX定义为Step1,Step2,...
在外类中,您只需提交N个Step1-Runnable,它将只使用您的15个线程并完成步骤。
我遗漏了一个方法,当所有步骤都完成时发出信号,因为有很多可能性可以做到这一点,我相信你可以自己选择一个。
为什么不能在同一个Runnable中执行所有步骤?例如
ExecutorService executor = Executors.newFixedThreadPool(15);
for (int i=0;i<=15;i++) {
Runnable worker = new Runnable() {
public void run() {
doStep1();
doStep2();
doStep3();
...
}
};
executor.execute(worker);
}
executor.shutdown();
异步Tcp客户端 异步Http客户端 异步Redis客户端 异步Mysql客户端 异步Log日志 异步文件读写 异常Exception
异步Log日志 use AsyncLog; yield AsyncLog::info('hello world'); yield AsyncLog::debug('test debug', ['foo' => 'bar']); yield AsyncLog::notice('hello world',[], 'group.com'); yield Async
我正在开发一个应用程序,不断地从Kafka主题中读取数据。这些数据是字符串格式的,然后我将其写入xml文件 要编写这些文件,我使用执行服务。 我想知道什么时候应该关闭我的执行服务。如果我的应用程序是有时间限制的,我会在executor实例上使用它,但我的应用程序应该持续运行。 如果发生任何异常,我的整个应用程序被杀死,它会自动关闭我的执行者吗?或者我应该捕获一个未检查的异常并关闭我的执行器,就像我
异常Exception 以传统的try,catch抓取异常 如果在业务层不catch,框架层会捕捉,并返回一个500的server error响应。 如果在开发环境会返回一个500的具体错误的trace响应。 try { throw new \Exception("Error Processing Request", 1); //yield throwExc
异步文件读写 读文件 use AsyncFile; $content = (yield AsyncFile::read(__ROOT__."runtime/test.txt")); 写文件 $res = (yield AsyncFile::write(__ROOT__."runtime/test.txt", "hello wordls!")); $res = (yi
异步Mysql客户端 AsyncMysql::query($sql, $usePool = true) 第二个参数设为false将不会使用连接池中的资源,默认都会从连接池中取,配置连接池数量 => config/database.php 具体使用 use AsyncMysql; //设置超时时间 AsyncMysql::setTimeout(2); $res = (
异步Redis客户端 连接池(连接池默认开启) use AsyncRedis; //关闭连接池 AsyncRedis::enablePool(false); //开启连接池 AsyncRedis::enablePool(true); 使用AsyncRedis use AsyncRedis; //设置超时时间 AsyncRedis::s
异步Http客户端 Get方式 1.使用域名形式 use AsyncHttp; //直接使用域名, get方式 $http = new AsyncHttp('http://groupco.com'); //设置2s超时 $http->setTimeout(2); //$http->setCookies(['token' => 'xxxx']);