当前位置: 首页 > 知识库问答 >
问题:

如何强制超时和取消异步完成未来作业

党建义
2023-03-14

我正在使用Java8,我想知道对3个异步作业强制超时的推荐方法,我将执行异步并从未来检索结果。请注意,所有3个作业的超时是相同的。如果超出时间限制,我还想取消作业。

我在想这样的事情:

// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync

List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

try {
    allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
   for(CompletableFuture f : future) {
      if(!f.isDone()) {
         /*
         From Java Doc:
         @param mayInterruptIfRunning this value has no effect in this
             * implementation because interrupts are not used to control
             * processing.
         */
         f.cancel(true);
      }
   }
}

List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
   if(!fu.isCancelled()) { // Is this needed?
      output.add(fu.join());
   }
}

return output;

  1. 像这样的东西有用吗?有更好的方法吗?
  2. 如何正确地取消未来?Javadoc说,线程不能被中断?所以,如果我取消一个未来,并调用κ(),我会立即得到结果,因为线程不会被中断吗?
  3. 在等待结束后,是否建议使用run()或get()来获取结果?

共有2个答案

汝和裕
2023-03-14

调用cancel后,您不能加入furure,因为您会得到一个异常。

终止计算的一种方法是让它引用未来并定期检查:如果它被取消了,从内部中止计算。如果计算是一个循环,在每一次迭代中你都可以进行检查,那么就可以做到这一点。

你需要它是一个CompletableFuture吗?因为另一种方法是避免使用CompleatableFuture,而是使用一个简单的Future或FutureTask:如果您使用Exec在调用时执行它future.cancel(true)将在拥有时终止计算。

回答问题:“调用join(),我会立即得到结果吗”。

不,你不会马上得到它,它会挂起等待完成计算:没有办法强迫一个需要很长时间才能在较短时间内完成的计算。

你可以称之为未来。complete(value)提供一个值,该值将由具有该未来引用的其他线程用作默认结果。

危斯伯
2023-03-14

值得注意的是,在CompletableFuture上调用cancel实际上与在当前阶段调用completeExceptionally是一样的。取消不会影响之前的阶段。说完后说道:

  1. 原则上,假设不需要上游取消,这样的东西就可以工作(从伪代码的角度来看,上面有语法错误)。
  2. CloetableFuture取消不会中断当前线程。取消将导致所有下游阶段立即被C(取消)异常触发(将短路执行流程)。
  3. 在调用者愿意无限期等待的情况下,“加入”和“获取”实际上是相同的。加入处理为您包装选中的异常。如果调用者想要超时,将需要get。

包括一个细分来说明取消时的行为。请注意,下游流程不会启动,但上游流程即使在取消后也会继续。

    public static void main(String[] args) throws Exception
    {
        int maxSleepTime = 1000;
        Random random = new Random();
        AtomicInteger value = new AtomicInteger();
        List<String> calculatedValues = new ArrayList<>();
        Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
        List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
        List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
        List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
        CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
        try
        {
            /*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
            awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
        }
        catch(TimeoutException ex)
        {
            for(CompletableFuture<String> toCancel : stage2)
            {
                boolean irrelevantValue = false;
                if(!toCancel.isDone())
                    toCancel.cancel(irrelevantValue);
                else
                    calculatedValues.add(toCancel.join());
            }
        }
        System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
        System.out.println("Values returned as of cancellation: " + calculatedValues);
        Thread.sleep(maxSleepTime);
    }

    private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture) 
    {
        return baseFuture.thenApply(val -> {  System.out.println("Stage 2 Running"); return "#" + val; }); 
    }
    
    private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture) 
    { 
        return baseFuture.thenApply(val ->  { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); }); 
    }

如果需要取消上游进程(例如:取消某些网络调用),则需要自定义处理。

 类似资料:
  • 我是新的完全未来。我试图为元素列表(即参数)调用并行方法,然后将结果组合起来创建最终响应。我还试图设置50毫秒的超时,以便如果调用不返回50毫秒,我将返回默认值。 到目前为止,我已经尝试过: 但我一直得到错误说: 有人能告诉我我在这里做错了什么吗?如果我走错了方向,请纠正我。 谢谢

  • 问题内容: 我正在使用Java 8可完成的期货。我有以下代码: 我使用runAsync计划执行等待闩锁的代码。接下来,我取消了将来,希望将被中断的异常抛出内部。但是似乎线程在await调用上仍然处于阻塞状态,即使将来被取消(断言通过)也永远不会抛出InterruptedException。使用ExecutorService的等效代码可以正常工作。是CompletableFuture中的错误还是我的

  • 问题内容: 我有一个调用一些不检查线程中断的代码。调用之后,该方法将立即抛出(如预期的那样)。但是,由于后台任务的代码从不检查其线程是否被中断,因此它很乐意继续执行。 是否有等待后台任务 实际 完成的标准方法?我希望显示“正在取消…”消息或某种类似的内容,直到任务终止为止。(我确信如果有必要,我总是可以在worker类中使用一个标志来完成此操作,只需寻找其他解决方案即可。) 问题答案: 我玩了一点

  • 我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做

  • 我有一个Kafka分区,和一个parkStreaming应用程序。一个服务器有10个内核。当火花流从Kafka收到一条消息时,后续过程将需要5秒钟(这是我的代码)。所以我发现火花流读取Kafka消息很慢,我猜当火花读出一条消息时,它会等到消息被处理,所以读取和处理是同步的。我想知道我可以异步读取火花吗?这样从Kafka读取的数据就不会被后续处理拖动。然后火花会很快消耗来自Kafka的数据。然后我可

  • 我正在开发一个微服务应用程序,在服务布局中,我想使用可完成的未来.runAsync()调用。问题是当我想抛出异常时,我有自己的处理程序异常,但是当它在 CompletedFuture 中的捕获块中生成时,我无法捕获错误,如下所示: 控制器: 服务: 客户: 错误处理程序: 我不可能使用正确的形式。知道如何在supplyAsync块中抛出异常吗?