我正在编写一个创建多个(7个)CompletableFutures的函数。这些期货基本上都做两件事:
当所有的7个期货都完成了工作,我想继续进一步的代码执行。因此,我使用allOf()然后对allOf()返回的Void CompletableFuture调用join()。
问题是,即使在所有的未来都已经执行(我可以看到CSV正在生成)之后,join()调用仍然被卡住,并且代码的进一步执行将永远被阻塞。
我试过以下几件事:
>
逐个等待每个将来,在每个将来之后调用join()。这是可行的,但代价是并发性。我不想这么做。
尝试使用带有超时的get()而不是join()。但是,这总是导致抛出异常(因为get总是超时),这是不希望的。
看到了这个JDK bug:https://bugs.openjdk.java.net/browse/jdk-8200347。不确定这是否是一个类似的问题。
尝试在没有join()或get()的情况下运行,这将无法保持线程的执行,也是不希望的。
创造所有未来的主要功能。
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
try {
// 1. DbCall 1
CompletableFuture<Void> f1 = dataHelper.fetchAndUploadCSV1();
// 2. DbCall 2
CompletableFuture<Void> f2 = dataHelper.fetchAndUploadCSV2();
// 3. DbCall 3
CompletableFuture<Void> f3 = dataHelper.fetchAndUploadCSV3();
// 4. DbCall 4
CompletableFuture<Void> f4 = dataHelper.fetchAndUploadCSV4();
// 5. DbCall 5
CompletableFuture<Void> f5 = dataHelper.fetchAndUploadCSV5();
// 6. DbCall 6
CompletableFuture<Void> f6 = dataHelper.fetchAndUploadCSV6();
// 7. DbCall 7
CompletableFuture<Void> f7 = dataHelper.fetchAndUploadCSV7();
CompletableFuture<Void>[] fAll = new CompletableFuture[] {f1, f2, f3, f4, f5, f6, f7};
CompletableFuture.allOf(fAll).join();
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
} catch (Exception e) {
msgResponse.setMessageStatus(ERROR);
msgResponse.setErrorMessage("error");
}
return msgResponse;
}
每个fetchAndUploadCSV()函数如下所示:
public CompletableFuture<Void> fetchAndUploadCSV1() {
return CompletableFuture.supplyAsync(() -> {
try {
return someService().getAllData1();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).thenAccept(results -> {
try {
if (results.size() > 0) {
csvWriter.uploadAsCsv(results);
}
else {
log.info(" No data found..");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
这就是csvwriter.uploadascsv(results)
的样子-
public <T> void uploadAsCsv(List<T> objectList) throws Exception {
long objListSize = ((objectList==null) ? 0 : objectList.size());
log.info("Action=Start, objectListSize=" + objListSize);
ByteArrayInputStream inputStream = getCsvAsInputStream(objectList);
Info fileInfo = someClient.uploadFile(inputStream);
log.info("Action=Done, FileInfo=" + ((fileInfo==null ? null : fileInfo.getID())));
}
我在这里使用OpenCSV将数据转换为CSV流。而且我总能看到最后一条原木线。
预期结果:所有提取的数据、生成的CSV和CustomResponse都应返回已处理的数据,没有错误消息。
实际结果:取出所有数据,生成CSV,挂起主线程。
您可以在每个创建的completablefuture
上使用join
,而不会牺牲并发性:
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
List<CompletableFuture<Void>> futures = Arrays.asList(dataHelper.fetchAndUploadCSV1(),
dataHelper.fetchAndUploadCSV2(),
dataHelper.fetchAndUploadCSV3(),
dataHelper.fetchAndUploadCSV4(),
dataHelper.fetchAndUploadCSV5(),
dataHelper.fetchAndUploadCSV6(),
dataHelper.fetchAndUploadCSV7());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> {
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
return msgResponse;
})
.exceptionally(throwable -> {
msgResponse.setMessageStatus("ERROR");
msgResponse.setErrorMessage("error");
return msgResponse;
}).join();
}
AllOf
返回一个新的CompletableFuture
,当所有给定的CompletableFutures完成时,该值将完成。因此,当thenapply
中调用join
时,它会立即返回。本质上,加入正在发生在已经完成的期货上。这样就消除了阻塞。此外,为了处理可能的异常,应该调用exceptionary
。
当我使用完全的未来。allOf()组合javadoc中描述的独立可完成的未来,在提供给该方法的所有未来之后,它不能可靠地完成。例如。: 结果如下: 我希望日志“Joined”和“Completed allOf”写在“Completed f1”和“Completed f2”之后。为了让事情变得更加混乱,阵列中的未来顺序似乎是头等大事。如果我换了台词 到 结果输出更改为: 更糟糕的是,如果我多次运行完
问题内容: 用户!我的Microsoft VS代码有问题。当我使用方法运行代码 我有一个问题“由于线程未挂起,评估失败。” PS当我使用javac和java运行文件时,此代码有效。 我也有VS Code的问题另一个问题 我的密码 对不起,语法不好。你能帮助我吗? 问题答案: 我是中国学生,我也遇到同样的问题。我在百度找到了解决方案。 vscode的内置调试控制台不支持Java输入。因此,您需要在调
从RunAction的完成处理程序调用SCNAction似乎会挂起SceneKit。 触摸事件或旋转设备似乎可以解除挂起的障碍。 繁殖: 1)采取默认的SceneKit项目,你得到的启动与旋转的宇宙飞船。 2) 替换动画代码: 与: 3)在模拟器或真实设备上运行(两者问题相同) 4)结果是: > 宇宙飞船旋转正常 DONE ROTATE打印出来了OK 现在它挂起来了 轻触屏幕(或将设备旋转至横向)
问题内容: 如何在Android中从辅助线程调用主线程? 问题答案: 最简单的方法是从线程中调用runOnUiThread(…)
我试图用一个自定义对象创建一个新线程,然后从主线程调用这个自定义对象方法。其思想是,主线程可以继续执行其他任务,而自定义对象可以继续在第二个线程中工作: 输出为: 它应该更像这样: 所以主线程被阻塞,直到方法完成。主线程是否在第二个线程中等待完成(作为返回类型为空,我认为情况不会如此)?还是在第一个线程中执行,因此阻塞了它? 我知道使用下面的代码,我可以在另一个线程中执行,但它每次都会从头开始创建
这是我正在研究的完全未来的例子 首先我从SupplySync调用compose方法,在这里我执行方法composeMethod,有三毫秒的延迟,然后它将创建一个文件并返回一个字符串作为结果。完成后,我调用Run方法,它只打印一个方法,然后有一个非阻塞方法从主线程运行。 我在这里面临的问题是,主线程执行完nonblockingmethod()并在3毫秒延迟之前退出进程,而随后的composeMeth