当前位置: 首页 > 知识库问答 >
问题:

为什么我的线程不等待CompletableFutures以“allOf()`”完成?

邹涵畅
2023-03-14

我正在学习Java 1.8中的CompletableFuture,在理解所有代码时遇到了困难。主线程似乎不会等待任何可完成的未来完成。

看见https://github.com/nurkiewicz/reactive/blob/master/src/test/java/be/more/reactive/S03_AllOf.java例如,我正在测试。

测试作业在打印任何结果之前完成。

有两种(丑陋的?)方法可以规避这一点:1)在主线程上设置超时并等待两者都完成。2)在末尾设置一个. get(),它将成为一个阻塞任务。

这是为什么呢?

代码片段:

package be.more.reactive;

import be.more.reactive.util.BaseTest;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class S03_AllOf extends BaseTest {

    private static final Logger log = LoggerFactory.getLogger(S03_AllOf.class);

    private final CompletableFuture<String> futureResult1 = getFutureQueryResult("1"); //.exceptionally() ??
    private final CompletableFuture<String> futureResult2 = getFutureQueryResult("2");
    private final CompletableFuture<String> futureResult3 = getFutureQueryResult("3");
    private final CompletableFuture<String> futureResult4 = getFutureQueryResult("4");

    @Test
    public void allOf() throws Exception {
        final CompletableFuture<Void> futureResult = CompletableFuture.allOf(   //Void ?? I want List<String>
                futureResult1, futureResult2, futureResult3, futureResult4
        );

//        futureResult.thenAccept((Void vd) -> vd.??)   //no, it won't work

        futureResult.thenRun(() -> {
            try {
                log.debug("Query result 1: '{}'", futureResult1.get());
                log.debug("Query result 2: '{}'", futureResult2.get());
                log.debug("Query result 3: '{}'", futureResult3.get());
                log.debug("Query result 4: '{}'", futureResult4.get());   //a lot of manual work

                log.debug("Now do on complete");    //handling onComplete
            } catch (Exception e) {
                log.error("", e);
            }
        });

    }

}

在BaseTest中:

protected CompletableFuture<String> getFutureQueryResult(final String queryId) {
    return CompletableFuture.supplyAsync(
            () -> db.apply(new Query(queryId))

    );
}

以DB为单位。Java语言

package be.more.reactive.db;

import java.util.concurrent.TimeUnit;

import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.apache.commons.lang3.RandomUtils.nextLong;

public class DB {
    public String apply(Query query) {
        try {
            TimeUnit.SECONDS.sleep(nextLong(2, 4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.format("%s_%s", randomAlphabetic(nextInt(4, 12)), query.getId());
    }
}

共有2个答案

吴展
2023-03-14

您看到的行为并非意外。当您创建一个可完成的未来时,您基本上是在计划一个异步运行的工作。

我们可以这样创建一个可完成的未来

var myFuture = CompletableFuture.supplyAsync(() -> myLongRunningOperation());

>

  • CompletableFuture将在单独的线程上调用myLongRunningAction

    CompletableFuture使用ExecutorService运行任务,可以在创建CompletableFuture期间提供。

    如果未提供ExecutorService,它将使用ForkJoinPool#commonPool提供的服务,该服务提供守护程序线程的线程池。

    任务()-

    在您的测试方法中,这是幕后发生的事情

    @Test
    public void allOf() throws Exception {
        // Schedules a computation (futureResult) on a different thread whose only 
        // work is to wait for the futures futureResult1, futureResult2, futureResult3 
        // and futureResult4 to complete
        final CompletableFuture<Void> futureResult = CompletableFuture.allOf(
                futureResult1, futureResult2, futureResult3, futureResult4
        );
    
        //  Schedules a computation that prints the results AFTER the futureResult is complete.
        futureResult.thenRun(() -> {
            try {
                log.debug("Query result 1: '{}'", futureResult1.get());
                log.debug("Query result 2: '{}'", futureResult2.get());
                log.debug("Query result 3: '{}'", futureResult3.get());
                log.debug("Query result 4: '{}'", futureResult4.get());
                log.debug("Now do on complete");
            } catch (Exception e) {
                log.error("", e);
            }
        });
    
        // Nothing more to do, so exit
    
    }
    

    但是,当您调用时。join()。get()执行测试的线程(主线程)将在退出之前等待计划的计算完成。

    因此,如果您希望您的测试等待计划的计算在它存在之前完成,

    //  Schedules a computation that prints the results ONCE the futureResult is complete.
    final CompletableFuture<Void> myFuture = futureResult.thenRun(() -> {
        try {
            log.debug("Query result 1: '{}'", futureResult1.get());
            log.debug("Query result 2: '{}'", futureResult2.get());
            log.debug("Query result 3: '{}'", futureResult3.get());
            log.debug("Query result 4: '{}'", futureResult4.get());   //a lot of manual work
    
            log.debug("Now do on complete");    //handling onComplete
        } catch (Exception e) {
            log.error("", e);
        }
    });
    
    // Wait for the myFuture to complete (sucessfully or throw an exception) before continuing.
    myFuture.get();
    

    在主线程上设置超时以等待将来完成是一种反模式。

    >

    如果您不关心结果,但希望应用程序等待将来完成,那么请创建一个自定义执行器服务,创建非守护进程线程。

  • 郭兴文
    2023-03-14

    来自Javadoc

    返回一个新的CompletableFuture,该新的CompletableFuture在所有给定CompletableFutures完成时完成。

    未来是一个异步任务,在调用get之前不会阻塞(仅当任务仍在运行时才会阻塞)。

    在这种情况下,CompleteableFuture是所有CompletableFutures的复合Future。这个未来仍然是一个阻塞异步调用,您必须调用get加入以等待所有期货完成。同样,来自javadoc

    在继续一个程序之前完成未来,如:CompletableFuture。所有(c1、c2、c3)。join()

    在我看来,你的解决方案既不是丑陋的,也不是意外的功能。

     类似资料:
    • 问题内容: 为什么线程不等待?线程启动,然后进入等待池,但是在那一刻之后它将继续执行。 问题答案: 您正在线程对象本身上进行同步,这是错误的用法。即将发生的事情是,即将死去的执行线程总是调用其对象: 依赖于this。因此,很清楚为什么在其中有或没有自己的情况下都会得到相同的行为。 解决方案:使用单独的对象进行线程协调;这是标准做法。

    • 在下面的代码中,为什么主线程要等到子线程完成。 Driver.java ThreadRunner.java 调用“t.start()”后,在驱动程序类中,程序是否应该退出?我没有使用join,但主线程仍在等待,直到新旋转的“ThreadRunner”运行开始。这是因为在java中,主线程(由main方法启动)总是等待所有线程关闭吗?

    • 问题内容: 因此,我有一些代码等待X发生,然后创建一个线程并执行processEmail。 我正在寻找的是一种代码,即使processEmail在另一个线程中发生,代码也可以继续等待X,但是当前代码只是等待线程完成,然后再等待X再次发生。 编辑:仅供参考,我什么都不需要在下面的代码中进一步输出processEmail.main(),因此不需要我等待其输出。 由Jean提供的答案:移除main之后的

    • 问题内容: 问题描述 : - 步骤1: 在主线程中从用户那里获取输入FILE_NAME。 步骤2: 对该文件执行10个操作(即,计数字符,计数行等。),所有这10个操作必须位于单独的线程中。这意味着必须有10个子线程。 步骤3: 主线程等待,直到所有那些子线程完成。 步骤4: 打印结果。 我做了什么 :- 我用3个线程做了一个示例代码。 我不希望您遇到文件操作代码。 问题:- 我上面的代码没有给出

    • 我想在C#中处理子目录和文件的文件系统/文件夹。我正在使用TPL库中的任务。这个想法是递归地执行它并为每个文件夹创建一个任务。主线程应该等待子线程完成,然后打印一些信息。事实上我只是想知道扫描何时完成。我已经开始使用线程池,然后切换到TLP。做了一些简单的例子。经过一些尝试从简单的代码到越来越臃肿的代码我被困在这里: 主线程有时仍然过早地继续,而不是在完成所有其他线程之后继续。(我对C#比较陌生,

    • 我正在努力学习如何正确使用async Wait,但我对它有点共同的想法。 在片段中,我试图构建一个对象数组,其中包含我需要的关于我在组件中上传的文件的信息。问题是this.fileInfo中的对象并没有完全等待返回编码图像的promise,而是在我console.logthis.fileInfo时返回此输出: 如您所见,关键图像是一个值未定义的ZoneAwarePromise。你能帮我修一下吗?