我正在使用Java8,我想知道对3个异步作业强制超时的推荐方法,我将执行异步并从未来检索结果。请注意,所有3个作业的超时是相同的。如果超出时间限制,我还想取消作业。
我在想这样的事情:
// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync
List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
for(CompletableFuture f : future) {
if(!f.isDone()) {
/*
From Java Doc:
@param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*/
f.cancel(true);
}
}
}
List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
if(!fu.isCancelled()) { // Is this needed?
output.add(fu.join());
}
}
return output;
κ()
,我会立即得到结果,因为线程不会被中断吗?调用cancel后,您不能加入furure,因为您会得到一个异常。
终止计算的一种方法是让它引用未来并定期检查:如果它被取消了,从内部中止计算。如果计算是一个循环,在每一次迭代中你都可以进行检查,那么就可以做到这一点。
你需要它是一个CompletableFuture吗?因为另一种方法是避免使用CompleatableFuture,而是使用一个简单的Future或FutureTask:如果您使用Exec在调用时执行它future.cancel(true)将在拥有时终止计算。
回答问题:“调用join(),我会立即得到结果吗”。
不,你不会马上得到它,它会挂起等待完成计算:没有办法强迫一个需要很长时间才能在较短时间内完成的计算。
你可以称之为未来。complete(value)提供一个值,该值将由具有该未来引用的其他线程用作默认结果。
值得注意的是,在CompletableFuture上调用cancel实际上与在当前阶段调用completeExceptionally是一样的。取消不会影响之前的阶段。说完后说道:
包括一个细分来说明取消时的行为。请注意,下游流程不会启动,但上游流程即使在取消后也会继续。
public static void main(String[] args) throws Exception
{
int maxSleepTime = 1000;
Random random = new Random();
AtomicInteger value = new AtomicInteger();
List<String> calculatedValues = new ArrayList<>();
Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
try
{
/*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex)
{
for(CompletableFuture<String> toCancel : stage2)
{
boolean irrelevantValue = false;
if(!toCancel.isDone())
toCancel.cancel(irrelevantValue);
else
calculatedValues.add(toCancel.join());
}
}
System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
System.out.println("Values returned as of cancellation: " + calculatedValues);
Thread.sleep(maxSleepTime);
}
private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 2 Running"); return "#" + val; });
}
private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); });
}
如果需要取消上游进程(例如:取消某些网络调用),则需要自定义处理。
我是新的完全未来。我试图为元素列表(即参数)调用并行方法,然后将结果组合起来创建最终响应。我还试图设置50毫秒的超时,以便如果调用不返回50毫秒,我将返回默认值。 到目前为止,我已经尝试过: 但我一直得到错误说: 有人能告诉我我在这里做错了什么吗?如果我走错了方向,请纠正我。 谢谢
问题内容: 我正在使用Java 8可完成的期货。我有以下代码: 我使用runAsync计划执行等待闩锁的代码。接下来,我取消了将来,希望将被中断的异常抛出内部。但是似乎线程在await调用上仍然处于阻塞状态,即使将来被取消(断言通过)也永远不会抛出InterruptedException。使用ExecutorService的等效代码可以正常工作。是CompletableFuture中的错误还是我的
问题内容: 我有一个调用一些不检查线程中断的代码。调用之后,该方法将立即抛出(如预期的那样)。但是,由于后台任务的代码从不检查其线程是否被中断,因此它很乐意继续执行。 是否有等待后台任务 实际 完成的标准方法?我希望显示“正在取消…”消息或某种类似的内容,直到任务终止为止。(我确信如果有必要,我总是可以在worker类中使用一个标志来完成此操作,只需寻找其他解决方案即可。) 问题答案: 我玩了一点
我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可
我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做
我正在开发一个微服务应用程序,在服务布局中,我想使用可完成的未来.runAsync()调用。问题是当我想抛出异常时,我有自己的处理程序异常,但是当它在 CompletedFuture 中的捕获块中生成时,我无法捕获错误,如下所示: 控制器: 服务: 客户: 错误处理程序: 我不可能使用正确的形式。知道如何在supplyAsync块中抛出异常吗?