当前位置: 首页 > 知识库问答 >
问题:

如何使用 java future 包装供应商,该未来在超时时返回值,但在后台继续运行(具有取消选项)

谢夜洛
2023-03-14

例如,我有一个供应商可能需要时间运行:

Supplier<Integer> numLoader = sneaky(() -> {
    Thread.sleep(10000);
    System.out.println("5 Calculated!");
    return 5;
});

* sneaky 只是一个转换为运行时异常的实用程序。

我希望能够做这样的事情:

Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
// numLoader takes 10 seconds to complete so -1 is returned.
int num = future.get(1000, TimeUnit.MILLISECONDS);
if (resourcesAreLow()) {
    future.cancel(true);
}
doSomethingWithTheValue(num);

我还有一个createFutureValueOnTimeout的部分实现:

private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
    CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
    return new FutureDecorator<V>(completableFuture) {
        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            return completableFuture.completeOnTimeout(v, timeout, unit).get();
        }
    };
}

问题在于,调用取消时,睡眠不会中断。

  1. 我怎样才能让取消生效
  2. 有没有更简单的方法可以在超时时返回值

完整测试:

public class TimeoutTest {
    @SneakyThrows
    @Test
    public void testTimeout() {
        int loadTimeMillis = 10000;
        Supplier<Integer> numLoader = () -> {
            try {
                // Simulate long operation
                Thread.sleep(loadTimeMillis);
            } catch (InterruptedException e) {
                System.out.println("Interrupted! message: " + e.getMessage());
                throw Lombok.sneakyThrow(e);
            }
            System.out.println("5 Calculated!");
            return 5;
        };

        Future<Integer> future = createFutureValueOnTimeout(-1, numLoader);
        
        long start = System.currentTimeMillis();

        // numLoader takes 10 seconds to complete so -1 is returned.
        int num = future.get(1000, TimeUnit.MILLISECONDS);

        System.out.println("Got: num: " + num + ". time: " + (System.currentTimeMillis() - start));

        if (resourcesAreLow()) {
            future.cancel(true);
        }
        // Don't stop the test. Give time for the cancel to kick in.
        Thread.sleep(loadTimeMillis);
        System.out.println("Finished. Time: " + (System.currentTimeMillis() - start));
    }

    private boolean resourcesAreLow() {
        return true;
    }

    private static <V> Future<V> createFutureValueOnTimeout(V v, Supplier<V> supplier) {
        CompletableFuture<V> completableFuture = CompletableFuture.supplyAsync(supplier);
        return new FutureDecorator<V>(completableFuture) {
            @Override
            public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
                return completableFuture.completeOnTimeout(v, timeout, unit).get();
            }
        };
    }

    private static class FutureDecorator<V> implements Future<V> {
        private final Future<V> inner;

        private FutureDecorator(Future<V> inner) {this.inner = inner;}

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return inner.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return inner.isCancelled();
        }

        @Override
        public boolean isDone() {
            return inner.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return inner.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return inner.get(timeout, unit);
        }
    }
}

输出:(请注意缺少“已中断!”消息

Got: num: -1. time: 1007
5 Calculated!
Finished. Time: 11021

共有1个答案

陶胤
2023-03-14

您可以将支持取消的执行器/未来API与完全未来 组合使用:

public static <R> CompletableFuture<R> withInterruptionSupport(Callable<R> c) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    FutureTask<R> ft = new FutureTask<R>(c) {
        @Override
        protected void set(R v) {
            super.set(v);
            cf.complete(v);
        }
        @Override
        protected void setException(Throwable t) {
            super.setException(t);
            cf.completeExceptionally(t);
        }
    };
    cf.defaultExecutor().execute(ft);
    cf.whenComplete((x,y) -> ft.cancel(true));
    return cf;
}

由于在实际函数中支持中断通常意味着处理中断异常,因此使用可调用而不是供应商,非常方便,因此允许抛出此异常。

只要CompletableFuture完成,支持中断取消的Future就会无条件取消,因为只要完成源于任务本身,未来已经完成,后续取消将被忽略。

这意味着,我们不需要在这里区分不同的完成可能性。< code>completeOnTimeout不仅起作用,您还可以对< code>CompletableFuture调用< code>cancel(…),它将中断< code>Callable的计算(尽管< code>boolean参数仍然是不相关的)。即使在没有等待超时的情况下调用带有替代结果的< code>complete,也会中断现在已经过时的评估。

所以,下面的作品:

for(int timeout: new int[] { 5, 15 }) {
    System.out.println("with timeout of " + timeout);
    Integer i = withInterruptionSupport(() -> {
            Thread.sleep(10000);
            System.out.println("5 Calculated!");
            return 5;
        })
        .completeOnTimeout(42, timeout, TimeUnit.SECONDS)
        .join();
    System.out.println("got " + i);
}
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
with timeout of 5
got 42
with timeout of 15
5 Calculated!
got 5
 类似资料:
  • 在我的应用程序中,我希望有一个轮询循环,它阻塞套接字接收操作,但在100毫秒后超时。这将允许我在需要时退出循环(例如,用户单击UI中的某些内容),同时避免使用busy循环或Thread.Sleep。

  • 我正在使用以下DAO 我的数据库有3列杂货店ID(Long-auto生成的)、杂货店名称(String)和杂货店状态(Int-1/0)。 当我在不使用LiveData的情况下使用getItemsBasedOn状态(状态:Int)时,我能够检索数据。但是当它被LiveData包装时,我变得为空。 另一个问题是,当我从数据库中获取项目列表时,没有使用LiveData包装并在ViewModel中分配给M

  • 我是RxJava的新手。我有一个,用于下载图像URL列表并将其转换为位图的简单网络操作。 在活动的上,我取消订阅可观察到的以避免内存泄漏。 我能看到的是,如果我在图像下载过程中退出活动,后台任务也会被终止。我只想继续做背景工作,即使我已经取消了《观察家》的订阅。有可能吗?

  • 问题内容: 当用Java编程时,我几乎总是出于习惯,编写如下代码: 大多数时候甚至都没有考虑它。现在,问题是:我应该 始终 将接口指定为返回类型吗?还是建议使用接口的实际实现,如果是,在什么情况下使用? 显然,使用该接口有很多优点(这就是为什么它存在的原因)。在大多数情况下,库函数使用哪种具体实现并不重要。但也许在某些情况下确实很重要。例如,如果我知道我将主要随机访问列表中的数据,那将是一个坏习惯

  • app.py reg_account.html 错误: 我想让的结果在中的复选框未选中时返回False,但我不明白为什么当我勾选了该复选框时,结果可以存储到数据库中?我试着调试了几次,但还是找不到一个可能的解决方案,所以有没有人可以帮忙?

  • 我正在运行以下代码,试图删除不存在的“尖叫”: 控制台日志显示以下内容: 我希望函数在404响应时停止执行,但似乎块都是在block之外执行的。为什么会这样?