当前位置: 首页 > 知识库问答 >
问题:

试着等待一个完整的未来

商开济
2023-03-14

是否有一种方法可以尝试等待completablefuture一段时间,然后返回不同的结果,而不取消超时后的未来?

我有一个服务(我们称之为expensiveservice),它跑出去做自己的事情。它返回一个结果:

enum Result {
    COMPLETED,
    PROCESSING,
    FAILED
}

我愿意[阻止并]等待它一小段时间(比方说2秒)。如果它没有完成,我希望返回一个不同的结果,但我希望服务继续做它自己的事情。然后查询服务是否完成(例如,通过websockets或其他方式)将是客户端的工作。

即。我们有以下几个案例:

  • ExpensiveService.ProcessAndGet()花费%1 s并完成其未来。它返回已完成
  • ExpensiveService.ProcessAndGet()在%1 s后失败。它返回失败
  • ExpensiveService.ProcessAndGet()需要5秒并完成它的将来。它返回processing。如果我们向另一个服务询问结果,则会得到completed
  • ExpensiveService.ProcessAndGet()在5秒后失败。它返回processing。如果向另一个服务询问结果,则会得到failed

在这个特定的情况下,我们实际上需要在超时时提取当前的结果对象,从而产生以下附加的edge-case。这会导致以下建议的解决方案出现一些问题:

  • ExpensiveService.ProcessAndGet()花费了2.01s并完成了它的将来。它返回processingcompleted

我也在使用Vavr并且愿意接受使用Vavr的future的建议。

我们提出了三种可能的解决办法,它们各有利弊:

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            Thread.sleep(2000);
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());
  1. 始终调用第二个resultservice
  2. 我们占用整个html" target="_blank">线程2秒。
CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            int attempts = 0;
            int timeout = 20;
            while (!f.isDone() && attempts * timeout < 2000) {
                Thread.sleep(timeout);
                attempts++;
            }
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());
  1. 仍然始终调用第二个ResultService
  2. 我们需要将第一个未来传递给第二个未来,这不太干净。
Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
    process.whenComplete((r, e) -> {
        synchronized (monitor) {
            monitor.notifyAll();
        }
    });
    try {
        int attempts = 0;
        int timeout = 20;
        while (!process.isDone() && attempts * timeout < 2000) {
            monitor.wait(timeout);
            attempts++;
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
if (process.isDone()) {
    return process.toCompletableFuture();
} else {
    return CompletableFuture.completedFuture(resultService.get(processId));
}
  1. 复杂代码(可能存在错误,不可读)。
return Future.of(() -> expensiveService.processAndGet()
        .await(2, TimeUnit.SECONDS)
        .recoverWith(e -> {
            if (e instanceof TimeoutException) {
                return Future.successful(resultService.get(processId));
            } else {
                return Future.failed(e);
            }
        })
        .toCompletableFuture();
    null
return expensiveService.processAndGet()
        .orTimeout(2, TimeUnit.SECONDS)
        .<CompletableFuture<Upload>>handle((u, e) -> {
            if (u != null) {
                return CompletableFuture.completedFuture(u);
            } else if (e instanceof TimeoutException) {
                return CompletableFuture.completedFuture(resultService.get(processId));
            } else {
                return CompletableFuture.failedFuture(e);
            }
        })
        .thenCompose(Function.identity());
  1. 虽然在我的情况下,processandget未来没有取消,但根据文档,应该取消。
  2. 异常处理不好。
return expensiveService.processAndGet()
        .completeOnTimeout(null, 2, TimeUnit.SECONDS)
        .thenApply(u -> {
            if (u == null) {
                return resultService.get(processId);
            } else {
                return u;
            }
        });
  1. 虽然在我的情况下,processandget未来没有完成,但根据文档,它应该完成。
  2. 如果ProcessAndGet希望以不同状态返回Null怎么办?

所有这些解决方案都有缺点,并且需要额外的代码,但这感觉像是completablefuture或VAVR的future都应该支持的东西。有没有更好的办法做到这一点?

共有1个答案

华星文
2023-03-14

首先值得指出的是,completablefuture是如何工作的(或者为什么这样命名):

CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);

基本等同于

CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
    if(!f.isDone()) {
        try {
            f.complete(supplier.get());
        }
        catch(Throwable t) {
            f.completeExceptionally(t);
        }
    }
});

completablefutureexecutor正在执行的代码没有连接,事实上,我们可以有任意数量的正在进行的完成尝试。一个特定的代码旨在完成completablefuture实例,这一事实只有在调用一个完成方法时才变得明显。

方法cancel与completeexceptionary(new CancellationException())具有相同的效果

所以一个取消只是另一个完成尝试,如果是第一个就会赢,但不影响任何其他完成尝试。

所以ortimeout(长超时,TimeUnit单位)在这方面没有太大区别。超时后,它将执行与CompleteExceptionary(new TimeoutException())等价的操作,如果没有其他更快的完成尝试,则该操作将获胜,这将影响从属阶段,但不会影响其他正在进行的完成尝试,例如ExpensiveService.ProcessAndGet()在您的情况下启动了什么。

您可以实现所需的操作,如

CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
    () -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
return future.applyToEither(alternative, Function.identity())
    .whenComplete((u,t) -> alternative.cancel(false));

对于delayedexecutor,我们使用的工具与ortimeoutcompleteontimeout相同。如果Future.WhenComplete中的取消更快,则在指定时间之前不评估指定的供应商或根本不评估指定的供应商applyToEnty将提供任何可用的更快的结果。

这不会在超时时完成future,但如前所述,它的完成无论如何不会影响原始计算,因此这也会起作用:

CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
    .execute(() -> {
        if(!future.isDone()) future.complete(resultService.get(processId));
    });
return future;

如前所述,这将在超时后完成未来,不会影响正在进行的计算,但会向调用方提供替代结果,但不会将ResultService.get(processId)引发的异常传播到返回的未来。

 类似资料:
  • 我要做的是异步计算树结构的深度,我将有树的第一层,我想启动一个异步线程来分别计算每个节点的深度。 在计算过程中,树中显然可能有一个分叉,在这一点上,我想踢一个额外的线程来计算那个分支。 我已经得到了这个工作,但我需要做一些整理逻辑,当所有这些未来完成。但我对这一过程中产生的额外的可完成的未来感到困扰。 我会用什么方法来保存所有开始的CompletableFutures+那些动态创建的,并且在执行任

  • 这段代码适用于发送数据并关闭连接的客户机,但是当使用一个连接的客户机多次发送时,数据没有被读取->我应该在读取完整正文后关闭连接吗?

  • 我想要一个完整的未来,只发出完成的信号(例如,我没有返回值)。 我可以将CompletableFuture实例化为: 但是我应该向完整的方法提供什么呢?例如,我不能做

  • 是否有一种方法可以在不阻塞事件循环的情况下等待一个未来完成? 我知道这个错误通常意味着什么,但我不知道在这种情况下...我试图在谷歌上搜索它,但没有找到任何关于将哪份清单放在哪里的明确解释。和以前一样,除非是强制性的,我更喜欢一次学一件事。 那么,回到这个问题:“基本”Vert.x是否有一种方法可以在事件循环不受干扰的情况下等待未来?

  • 我想运行相同类型的任务(工作线程),但一次不超过一定数量的任务。当任务完成时,其结果是新任务的输入,然后可以启动该任务。 有没有好的方法可以在C 11中使用异步/未来范式来实现这一点? 乍一看,它看起来很简单,你只是生成多个任务: 然后,运行以获取任务的异步结果。 然而,这里的问题是,未来的对象必须存储在某种队列中并一个接一个地等待。但是,可以一遍又一遍地迭代未来的对象,检查它们中的任何一个是否准

  • 想改进这个问题吗 通过编辑这篇文章,更新问题,以便用事实和引文来回答。 我调用一个方法,该方法为列表中的每个元素返回一次未来 返回的方法是库代码,我无法控制该代码的运行方式,我所拥有的只是。 我想等待所有完成(成功或失败),然后再继续。 有没有比这更好的方法: 对于好奇的人: 与这个等待未来列表的答案不同,我无法控制创建未来的代码。