当前位置: 首页 > 知识库问答 >
问题:

超时后停止java CompletableFuture中的线程

左丘繁
2023-03-14

我的java代码中有一个异步链,我想在某个超时后停止它,所以我创建了一个包含一些线程的线程池,并像这样调用CompletableFuture

ExecutorService pool = Executors.newFixedThreadPool(10);

然后我有一个循环方法,从数据库加载数据并对其执行一些任务,一旦所有CompletableFutures都完成了,它就会再次执行

CompletableFuture<MyObject> futureTask =
                CompletableFuture.supplyAsync(() -> candidate, pool)
                .thenApply(Task1::doWork).thenApply(Task2::doWork).thenApply(Task3::doWork)
                .thenApply(Task4::doWork).thenApply(Task5::doWork).orTimeout(30,TimeUnit.SECONDS)
                .thenApply(Task6::doWork).orTimeout(30,TimeUnit.SECONDS)
                .exceptionally(ExceptionHandlerService::handle);

我的问题是task6,它有一个非常密集的任务(它是一个网络连接任务,有时会永远挂起)。我注意到我的orTimeout在30秒后被正确触发,但运行task6的线程仍在运行

经过几次这样的循环,我所有的线程都耗尽了,我的应用程序也死了

达到超时后,如何取消池中正在运行的线程?(不调用pool.shutdown())

更新*在主线程中,我做了一个简单的检查,如下所示

for (int i = TIME_OUT_SECONDS; i >= 0; i--) {
                unfinishedTasks = handleFutureTasks(unfinishedTasks, totalBatchSize);
                if(unfinishedTasks.isEmpty()) {
                    break;
                }
                if(i==0) {
                    //handle cancelation of the tasks
                    for(CompletableFuture<ComplianceCandidate> task: unfinishedTasks) {
                        **task.cancel(true);**
                        log.error("Reached timeout on task, is canceled: {}", task.isCancelled());
                    }
                    break;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ex) {
                }
            }

我看到的是,在几个周期之后,所有的任务都抱怨超时。。。在前1-2个周期中,我仍然得到了预期的响应(尽管有线程处理它)

我还是觉得线程池耗尽了

共有1个答案

赵朝
2023-03-14

我知道你没有打电话给泳池就说了。关机,但没有其他方法。但是,当您查看阶段时,它们将在“附加”它们的线程(添加那些然后应用)或您定义的池中的线程中运行。也许这是一个更有意义的例子。

public class SO64743332 {

    static ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);

        //simulateWork(4);

        CompletableFuture<String> f2 = f1.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationOne(x);
        });

        CompletableFuture<String> f3 = f2.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationTwo(x);
        });

        f3.join();
    }

    private static String dbCall() {
        simulateWork(2);
        return "a";
    }

    private static String transformationOne(String input) {
        return input + "b";
    }

    private static String transformationTwo(String input) {
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

上述代码的关键点是:模拟工作(4) 。运行注释掉的代码,然后取消注释。查看哪个线程将实际执行所有这些,然后应用。它可以是main或池中的同一个线程,这意味着尽管您定义了一个池,但该池中只有一个线程将执行所有这些阶段。

在这种情况下,您可以定义一个运行所有这些阶段的单线程执行器(比如说在方法内部)。这样,您可以控制何时调用ShutDownNow并可能中断(如果您的代码响应中断)正在运行的任务。下面是一个模拟的例子:

public class SO64743332 {

    public static void main(String[] args) {
        execute();
    }


    public static void execute() {

        ExecutorService pool = Executors.newSingleThreadExecutor();

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);
        CompletableFuture<String> cf2 = cf1.thenApply(x -> transformationOne(x));

        // give enough time for transformationOne to start, but not finish
        simulateWork(2);

        try {
            CompletableFuture<String> cf3 = cf2.thenApply(x -> transformationTwo(x))
                                               .orTimeout(4, TimeUnit.SECONDS);
            cf3.get(10, TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            pool.shutdownNow();
        }

    }

    private static String dbCall() {
        System.out.println("Started DB call");
        simulateWork(1);
        System.out.println("Done with DB call");
        return "a";
    }

    private static String transformationOne(String input) {
        System.out.println("Started work");
        simulateWork(10);
        System.out.println("Done work");
        return input + "b";
    }

    private static String transformationTwo(String input) {
        System.out.println("Started transformation two");
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

运行此命令时,您应该注意到,transformationOne会启动,但由于shutdownow而中断。

这样做的缺点应该很明显,每次调用execute都会创建一个新的线程池。。。

 类似资料:
  • 问题内容: 我正在使用实现一个3线程池,并使用CountDownLatch监视所有线程的完成情况,以进行进一步处理。 我曾经等到所有线程完成。我希望此过程在超时(例如45秒)的情况下中止。我该如何实施? 问题答案: 使用的重载变体接受超时。

  • 问题内容: 我读了这句话: 主线程必须是完成执行的最后一个线程。当主线程停止时,程序终止。 是真的吗 我也知道“即使主线程死程序继续运行”。 据我了解:启动程序时,JVM创建一个线程来运行您的程序。JVM创建一个用于运行程序的用户线程。该线程称为主线程。该类的main方法是从主线程调用的。如果程序从主线程中产生新线程,它将停止直到最后一个线程死亡。 哪一个是对的? 问题答案: 当所有非守护程序线程

  • 问题内容: 我正在编写一段连接到服务器的代码,使用该连接会生成一堆线程并执行一堆“工作”。 在某些情况下,连接失败,我需要停止所有操作并从头开始创建新对象。 我想在对象之后进行清理,但在线程上调用thread.stop,但是此方法似乎已被弃用。 推荐的替代方法是什么?是否应该为每个线程编写自己的清理和退出方法?将线程设置为null?或者是其他东西? 问题答案: 看这里 : 在HowToStopAT

  • motioncfp.java:

  • 我通过创建固定数量的线程来使用执行器服务来进行HTTP GET数据检索。 当Tomcat停止时,我们会出现以下错误: 严重:web应用程序[/viewer]似乎已启动名为[ThreadExecutor_51616156]的线程,但未能停止该线程。这很可能会造成内存泄漏。 这是真的吗?在没有这些服务错误的情况下,如何正确停止tomcat。

  • 问题内容: 我有一个复杂的功能(优化),它有可能进入循环或仅花费太多时间,而允许的时间由用户设置。 因此,我试图使函数在单独的线程中运行,并在经过最大时间后停止运行。我使用与以下代码相似的代码,但是它不起作用,因此 我认为我没有正确使用函数“ join”,或者它没有做我所了解的。任何想法? 谢谢! 感谢您的回答,目前我在这里找到了一个更好的主意*。它可以工作,但仍使用不建议使用的“停止”功能。新的