当前位置: 首页 > 知识库问答 >
问题:

CompletableFuture join()调用挂起主线程。个人期货从未完成

施令雪
2023-03-14

我正在编写一个创建多个(7个)CompletableFutures的函数。这些期货基本上都做两件事:

  1. 使用supplyAsync()从某些数据库获取数据
  2. 使用thenAccept()将此数据写入CSV文件

当所有的7个期货都完成了工作,我想继续进一步的代码执行。因此,我使用allOf()然后对allOf()返回的Void CompletableFuture调用join()。

问题是,即使在所有的未来都已经执行(我可以看到CSV正在生成)之后,join()调用仍然被卡住,并且代码的进一步执行将永远被阻塞。

我试过以下几件事:

>

  • 逐个等待每个将来,在每个将来之后调用join()。这是可行的,但代价是并发性。我不想这么做。

    尝试使用带有超时的get()而不是join()。但是,这总是导致抛出异常(因为get总是超时),这是不希望的。

    看到了这个JDK bug:https://bugs.openjdk.java.net/browse/jdk-8200347。不确定这是否是一个类似的问题。

    尝试在没有join()或get()的情况下运行,这将无法保持线程的执行,也是不希望的。

    创造所有未来的主要功能

    public CustomResponse process() {
            CustomResponse msgResponse = new CustomResponse();
            try {
                // 1. DbCall 1
                CompletableFuture<Void> f1 = dataHelper.fetchAndUploadCSV1();
    
                // 2. DbCall 2
                CompletableFuture<Void> f2 = dataHelper.fetchAndUploadCSV2();
    
    
                // 3. DbCall 3
                CompletableFuture<Void> f3 = dataHelper.fetchAndUploadCSV3();
    
    
                // 4. DbCall 4
                CompletableFuture<Void> f4 = dataHelper.fetchAndUploadCSV4();
    
    
                // 5. DbCall 5
                CompletableFuture<Void> f5 = dataHelper.fetchAndUploadCSV5();
    
    
                // 6. DbCall 6
                CompletableFuture<Void> f6 = dataHelper.fetchAndUploadCSV6();
    
    
                // 7. DbCall 7
                CompletableFuture<Void> f7 = dataHelper.fetchAndUploadCSV7();
    
    
                CompletableFuture<Void>[] fAll = new CompletableFuture[] {f1, f2, f3, f4, f5, f6, f7};
    
    
                CompletableFuture.allOf(fAll).join();
                msgResponse.setProcessed(true);
                msgResponse.setMessageStatus("message");
            } catch (Exception e) {
        msgResponse.setMessageStatus(ERROR);
                msgResponse.setErrorMessage("error");
            }
            return msgResponse;
        }
    

    每个fetchAndUploadCSV()函数如下所示:

    public CompletableFuture<Void> fetchAndUploadCSV1() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return someService().getAllData1();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).thenAccept(results -> {
                try {
                    if (results.size() > 0) {
                        csvWriter.uploadAsCsv(results);
                    }
                    else {
                        log.info(" No data found..");
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
    

    这就是csvwriter.uploadascsv(results)的样子-

    public <T> void uploadAsCsv(List<T> objectList) throws Exception {
            long objListSize = ((objectList==null) ? 0 : objectList.size());
            log.info("Action=Start, objectListSize=" + objListSize);
            ByteArrayInputStream inputStream = getCsvAsInputStream(objectList);
            Info fileInfo = someClient.uploadFile(inputStream);
            log.info("Action=Done, FileInfo=" + ((fileInfo==null ? null : fileInfo.getID())));
        }
    

    我在这里使用OpenCSV将数据转换为CSV流。而且我总能看到最后一条原木线。

    预期结果:所有提取的数据、生成的CSV和CustomResponse都应返回已处理的数据,没有错误消息。

    实际结果:取出所有数据,生成CSV,挂起主线程。

  • 共有1个答案

    屠泰平
    2023-03-14

    您可以在每个创建的completablefuture上使用join,而不会牺牲并发性:

    public CustomResponse process() {
        CustomResponse msgResponse = new CustomResponse();
    
        List<CompletableFuture<Void>> futures = Arrays.asList(dataHelper.fetchAndUploadCSV1(),
                dataHelper.fetchAndUploadCSV2(),
                dataHelper.fetchAndUploadCSV3(),
                dataHelper.fetchAndUploadCSV4(),
                dataHelper.fetchAndUploadCSV5(),
                dataHelper.fetchAndUploadCSV6(),
                dataHelper.fetchAndUploadCSV7());
    
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> {
                    msgResponse.setProcessed(true);
                    msgResponse.setMessageStatus("message");
                    return msgResponse;
                })
                .exceptionally(throwable -> {
                    msgResponse.setMessageStatus("ERROR");
                    msgResponse.setErrorMessage("error");
                    return msgResponse;
                }).join();
    }
    

    AllOf返回一个新的CompletableFuture,当所有给定的CompletableFutures完成时,该值将完成。因此,当thenapply中调用join时,它会立即返回。本质上,加入正在发生在已经完成的期货上。这样就消除了阻塞。此外,为了处理可能的异常,应该调用exceptionary

     类似资料:
    • 当我使用完全的未来。allOf()组合javadoc中描述的独立可完成的未来,在提供给该方法的所有未来之后,它不能可靠地完成。例如。: 结果如下: 我希望日志“Joined”和“Completed allOf”写在“Completed f1”和“Completed f2”之后。为了让事情变得更加混乱,阵列中的未来顺序似乎是头等大事。如果我换了台词 到 结果输出更改为: 更糟糕的是,如果我多次运行完

    • 问题内容: 用户!我的Microsoft VS代码有问题。当我使用方法运行代码 我有一个问题“由于线程未挂起,评估失败。” PS当我使用javac和java运行文件时,此代码有效。 我也有VS Code的问题另一个问题 我的密码 对不起,语法不好。你能帮助我吗? 问题答案: 我是中国学生,我也遇到同样的问题。我在百度找到了解决方案。 vscode的内置调试控制台不支持Java输入。因此,您需要在调

    • 从RunAction的完成处理程序调用SCNAction似乎会挂起SceneKit。 触摸事件或旋转设备似乎可以解除挂起的障碍。 繁殖: 1)采取默认的SceneKit项目,你得到的启动与旋转的宇宙飞船。 2) 替换动画代码: 与: 3)在模拟器或真实设备上运行(两者问题相同) 4)结果是: > 宇宙飞船旋转正常 DONE ROTATE打印出来了OK 现在它挂起来了 轻触屏幕(或将设备旋转至横向)

    • 问题内容: 如何在Android中从辅助线程调用主线程? 问题答案: 最简单的方法是从线程中调用runOnUiThread(…)

    • 我试图用一个自定义对象创建一个新线程,然后从主线程调用这个自定义对象方法。其思想是,主线程可以继续执行其他任务,而自定义对象可以继续在第二个线程中工作: 输出为: 它应该更像这样: 所以主线程被阻塞,直到方法完成。主线程是否在第二个线程中等待完成(作为返回类型为空,我认为情况不会如此)?还是在第一个线程中执行,因此阻塞了它? 我知道使用下面的代码,我可以在另一个线程中执行,但它每次都会从头开始创建

    • 这是我正在研究的完全未来的例子 首先我从SupplySync调用compose方法,在这里我执行方法composeMethod,有三毫秒的延迟,然后它将创建一个文件并返回一个字符串作为结果。完成后,我调用Run方法,它只打印一个方法,然后有一个非阻塞方法从主线程运行。 我在这里面临的问题是,主线程执行完nonblockingmethod()并在3毫秒延迟之前退出进程,而随后的composeMeth