我的java代码中有一个异步链,我想在某个超时后停止它,所以我创建了一个包含一些线程的线程池,并像这样调用CompletableFuture
ExecutorService pool = Executors.newFixedThreadPool(10);
然后我有一个循环方法,从数据库加载数据并对其执行一些任务,一旦所有CompletableFutures都完成了,它就会再次执行
CompletableFuture<MyObject> futureTask =
CompletableFuture.supplyAsync(() -> candidate, pool)
.thenApply(Task1::doWork).thenApply(Task2::doWork).thenApply(Task3::doWork)
.thenApply(Task4::doWork).thenApply(Task5::doWork).orTimeout(30,TimeUnit.SECONDS)
.thenApply(Task6::doWork).orTimeout(30,TimeUnit.SECONDS)
.exceptionally(ExceptionHandlerService::handle);
我的问题是task6,它有一个非常密集的任务(它是一个网络连接任务,有时会永远挂起)。我注意到我的orTimeout在30秒后被正确触发,但运行task6的线程仍在运行
经过几次这样的循环,我所有的线程都耗尽了,我的应用程序也死了
达到超时后,如何取消池中正在运行的线程?(不调用pool.shutdown())
更新*在主线程中,我做了一个简单的检查,如下所示
for (int i = TIME_OUT_SECONDS; i >= 0; i--) {
unfinishedTasks = handleFutureTasks(unfinishedTasks, totalBatchSize);
if(unfinishedTasks.isEmpty()) {
break;
}
if(i==0) {
//handle cancelation of the tasks
for(CompletableFuture<ComplianceCandidate> task: unfinishedTasks) {
**task.cancel(true);**
log.error("Reached timeout on task, is canceled: {}", task.isCancelled());
}
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ex) {
}
}
我看到的是,在几个周期之后,所有的任务都抱怨超时。。。在前1-2个周期中,我仍然得到了预期的响应(尽管有线程处理它)
我还是觉得线程池耗尽了
我知道你没有打电话给泳池就说了。关机,但没有其他方法。但是,当您查看阶段时,它们将在“附加”它们的线程(添加那些然后应用
)或您定义的池中的线程中运行。也许这是一个更有意义的例子。
public class SO64743332 {
static ExecutorService pool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);
//simulateWork(4);
CompletableFuture<String> f2 = f1.thenApply(x -> {
System.out.println(Thread.currentThread().getName());
return transformationOne(x);
});
CompletableFuture<String> f3 = f2.thenApply(x -> {
System.out.println(Thread.currentThread().getName());
return transformationTwo(x);
});
f3.join();
}
private static String dbCall() {
simulateWork(2);
return "a";
}
private static String transformationOne(String input) {
return input + "b";
}
private static String transformationTwo(String input) {
return input + "b";
}
private static void simulateWork(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException e) {
System.out.println("Interrupted!");
e.printStackTrace();
}
}
}
上述代码的关键点是:
模拟工作(4)
。运行注释掉的代码,然后取消注释。查看哪个线程将实际执行所有这些
,然后应用
。它可以是
main
或池中的同一个线程,这意味着尽管您定义了一个池,但该池中只有一个线程将执行所有这些阶段。
在这种情况下,您可以定义一个运行所有这些阶段的单线程执行器(比如说在方法内部)。这样,您可以控制何时调用
ShutDownNow
并可能中断(如果您的代码响应中断)正在运行的任务。下面是一个模拟的例子:
public class SO64743332 {
public static void main(String[] args) {
execute();
}
public static void execute() {
ExecutorService pool = Executors.newSingleThreadExecutor();
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);
CompletableFuture<String> cf2 = cf1.thenApply(x -> transformationOne(x));
// give enough time for transformationOne to start, but not finish
simulateWork(2);
try {
CompletableFuture<String> cf3 = cf2.thenApply(x -> transformationTwo(x))
.orTimeout(4, TimeUnit.SECONDS);
cf3.get(10, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
pool.shutdownNow();
}
}
private static String dbCall() {
System.out.println("Started DB call");
simulateWork(1);
System.out.println("Done with DB call");
return "a";
}
private static String transformationOne(String input) {
System.out.println("Started work");
simulateWork(10);
System.out.println("Done work");
return input + "b";
}
private static String transformationTwo(String input) {
System.out.println("Started transformation two");
return input + "b";
}
private static void simulateWork(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException e) {
System.out.println("Interrupted!");
e.printStackTrace();
}
}
}
运行此命令时,您应该注意到,
transformationOne
会启动,但由于shutdownow
而中断。
这样做的缺点应该很明显,每次调用
execute
都会创建一个新的线程池。。。
问题内容: 我正在使用实现一个3线程池,并使用CountDownLatch监视所有线程的完成情况,以进行进一步处理。 我曾经等到所有线程完成。我希望此过程在超时(例如45秒)的情况下中止。我该如何实施? 问题答案: 使用的重载变体接受超时。
问题内容: 我读了这句话: 主线程必须是完成执行的最后一个线程。当主线程停止时,程序终止。 是真的吗 我也知道“即使主线程死程序继续运行”。 据我了解:启动程序时,JVM创建一个线程来运行您的程序。JVM创建一个用于运行程序的用户线程。该线程称为主线程。该类的main方法是从主线程调用的。如果程序从主线程中产生新线程,它将停止直到最后一个线程死亡。 哪一个是对的? 问题答案: 当所有非守护程序线程
问题内容: 我正在编写一段连接到服务器的代码,使用该连接会生成一堆线程并执行一堆“工作”。 在某些情况下,连接失败,我需要停止所有操作并从头开始创建新对象。 我想在对象之后进行清理,但在线程上调用thread.stop,但是此方法似乎已被弃用。 推荐的替代方法是什么?是否应该为每个线程编写自己的清理和退出方法?将线程设置为null?或者是其他东西? 问题答案: 看这里 : 在HowToStopAT
motioncfp.java:
我通过创建固定数量的线程来使用执行器服务来进行HTTP GET数据检索。 当Tomcat停止时,我们会出现以下错误: 严重:web应用程序[/viewer]似乎已启动名为[ThreadExecutor_51616156]的线程,但未能停止该线程。这很可能会造成内存泄漏。 这是真的吗?在没有这些服务错误的情况下,如何正确停止tomcat。
问题内容: 我有一个复杂的功能(优化),它有可能进入循环或仅花费太多时间,而允许的时间由用户设置。 因此,我试图使函数在单独的线程中运行,并在经过最大时间后停止运行。我使用与以下代码相似的代码,但是它不起作用,因此 我认为我没有正确使用函数“ join”,或者它没有做我所了解的。任何想法? 谢谢! 感谢您的回答,目前我在这里找到了一个更好的主意*。它可以工作,但仍使用不建议使用的“停止”功能。新的