当前位置: 首页 > 知识库问答 >
问题:

如何中断ComppletableFuture的底层执行

訾旭
2023-03-14

我知道CompletableFuture设计无法通过中断来控制其执行,但我想你们中的一些人可能有这个问题。CompletableFutures是组成异步执行的非常好的方法,但考虑到当未来被取消时您希望底层执行被中断或停止的情况,我们如何做到这一点?或者我们必须接受任何取消或手动完成的CompletableFuture都不会影响在那里工作以完成它的线程?

在我看来,这显然是一项无用的工作,需要执行者的时间。我想知道在这种情况下,什么方法或设计可能会有所帮助?

更新

这是一个简单的测试

public class SimpleTest {

  @Test
  public void testCompletableFuture() throws Exception {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());

    bearSleep(1);

    //cf.cancel(true);
    cf.complete(null);

    System.out.println("it should die now already");
    bearSleep(7);
  }

  public static void longOperation(){
    System.out.println("started");
    bearSleep(5);
    System.out.println("completed");
  }

  private static void bearSleep(long seconds){
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      System.out.println("OMG!!! Interrupt!!!");
    }
  }
}

共有3个答案

宋英杰
2023-03-14

如果你使用

cf.get();

代替

cf.join();

等待完成的线程可能会中断。这咬了我一个**,所以我只是把它放在那里。然后,您需要进一步传播此中断/使用cf.cancel(...)才能真正完成执行。

吕征
2023-03-14

这个怎么样?

public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {

    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    final CompletableFuture<T> cf = new CompletableFuture<T>() {
        @Override
        public boolean complete(T value) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.complete(value);
        }

        @Override
        public boolean completeExceptionally(Throwable ex) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.completeExceptionally(ex);
        }
    };

    // submit task
    executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    return cf;
}

简单测试:

    CompletableFuture<String> cf = supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            System.out.println("got interrupted");
            return "got interrupted";
        }
        System.out.println("normal complete");
        return "normal complete";
    });

    cf.complete("manual complete");
    System.out.println(cf.get());

我不喜欢每次都必须创建Executor服务的想法,但也许您可以找到重用ForkJoinPool的方法

赵英资
2023-03-14

CompletableFuture与最终可能完成它的异步操作无关。

由于(与< code>FutureTask不同)该类对导致其完成的计算没有直接控制,取消被视为另一种形式的异常完成。方法< code>cancel与< code > complete exception(new cancellation exception())具有相同的效果。

甚至可能没有单独的线程在完成它(甚至可能有许多线程在处理它)。即使有,也没有从CompletableFuture到任何引用它的线程的链接。

因此,您无法通过CompletableFuture中断任何可能正在运行某个任务以完成该任务的线程。您必须编写自己的逻辑来跟踪任何线程实例,这些实例获取了对可完成未来的引用,以完成它。

这是一个我认为你可以逃脱的执行类型的例子。

public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(1);
    CompletableFuture<String> completable = new CompletableFuture<>();
    Future<?> future = service.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                if (Thread.interrupted()) {
                    return; // remains uncompleted
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    return; // remains uncompleted
                }
            }
            completable.complete("done");
        }
    });

    Thread.sleep(2000);

    // not atomic across the two
    boolean cancelled = future.cancel(true);
    if (cancelled)
        completable.cancel(true); // may not have been cancelled if execution has already completed
    if (completable.isCancelled()) {
        System.out.println("cancelled");
    } else if (completable.isCompletedExceptionally()) {
        System.out.println("exception");
    } else {
        System.out.println("success");
    }
    service.shutdown();
}

这假定正在执行的任务设置为正确处理中断。

 类似资料:
  • 我预计从下面的代码中,由于超时,可转换的未来将停止处理。但我可以看到取消没有任何影响。 我可以看到2个完全期货仍在运行。

  • 问题内容: 我知道设计不能通过中断来控制其执行,但是我想其中有些人可能会遇到此问题。s是组成异步执行的一种非常好的方法,但是考虑到当您希望取消future时中断或停止基础执行时,我们该怎么做?还是我们必须接受,任何取消或手动完成的操作都不会影响正在执行该操作的线程? 我认为,那显然是一项无用的工作,需要花费执行者的时间。我想知道在这种情况下哪种方法或设计可能会有所帮助? 更新 这是一个简单的测试

  • 比如有对象 我只能按照顺序拿到数组['a', 'b', 'c', 'd', 'f'] 如果想给f赋值, 只能 如果想要写上面这个表达式的通用函数,要怎么写。

  • 我有一个,它将计算出的数据转发到: 我希望客户端/用户能够取消任务: 这不起作用,因为这将取消外部的,而不是executor服务中计划的内部未来。 是否有可能将外部未来的传播到内部未来?

  • 我需要放心地配置底层的Jackson ObjectMapper。我正在使用REST Assured编写REST API测试,我需要定义一些过滤器来注册ObjectMapper,它用于将我的对象序列化到JSON: }

  • 问题内容: 我最近在玩go,尝试制作一些服务器来响应tcp连接上的客户端。 我的问题是我该如何干净地关闭服务器并中断在以下调用中当前“被阻止”的go例程 func(* TCPListener)接受吗? 根据接受文件 Accept在侦听器接口中实现Accept方法;它等待下一个调用并返回通用Conn。 错误也很少被记录下来。 问题答案: 这是我一直在寻找的东西。也许将来会帮助某人。注意使用selec