当前位置: 首页 > 知识库问答 >
问题:

在Java,我如何处理CompletableFutures并得到第一个完成的预期结果?

狄河
2023-03-14

通常,对于CompletableFuture,我会调用thenApply或它的其他方法,以便在结果可用时立即执行某些操作。然而,我现在有一种情况,我想处理结果,直到我收到一个肯定的结果,然后忽略所有进一步的结果。

如果我只是想获取第一个可用的结果,我可以使用CompletableFuture.anyOf(尽管我讨厌为了调用anyOf而将列表转换为数组)。但那不是我想要的。我想取第一个结果,如果它没有理想的结果,那么我想处理第二个可用的结果,依此类推,直到得到理想的结果。

下面是一个简单的示例,它遍历所有结果,并返回它找到的第一个大于9的值。(注意,这不是我真正的任务,这只是一个简单的例子。)

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    for(CompletableFuture<Integer> result : results) {
        Integer v = result.get();
        if(v > 9)
            return v;
    }
    return null;
}

当然,该示例从一开始就查看结果,而不是在结果完成时查看结果。这里有一个实现了我想要的,但代码要复杂得多。

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    AtomicInteger finalResult = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(results.size());
    for(CompletableFuture<Integer> result : results) {
        result.whenComplete((v,e) -> {
            if(e!=null) {
                Logger.getLogger(getClass()).error("",e);
            } else if(v > 9) {
                finalResult.set(v);
                while(latch.getCount() > 0)
                    latch.countDown();
                return;
            }
            latch.countDown();
        });
    }
    latch.await();

    if(finalResult.get() > 9)
        return finalResult.get();
    return null;
}    

有没有一个api我可以这样做?

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    Iterator<Integer> resultIt = getResultsAsAvailable(results);
    for(; resultIt.hasNext();) {
        Integer v = resultIt.next();
        if(v > 9)
            return v;
    }
    return null;
}

或者更好:

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) {
    return getFirstMatch(results, r -> {return r > 9;});
}

共有2个答案

夹谷星剑
2023-03-14

我不知道在JDK或其他地方有任何这样的API。你可以自己滚。

如果未来已经完成,completablefuture#complete(和completeexceptional)不会执行任何操作,您可以利用这一事实。

如果尚未完成,则将get()和相关方法返回的值设置为给定值。

创建新的最终结果completablefuture。如果条件适用,将尝试complete此最终结果的每个futures添加一个continuation。未来将以第一次成功而告终。但是,如果没有成功,您显然需要null作为结果。您可以使用allof创建completablefuture来尝试使用null来完成最终结果。

类似于

public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) {
    CompletableFuture<T> finalResult = new CompletableFuture<>();
    // attempt to complete on success
    futures.stream().forEach(future -> future.thenAccept(successResult -> {
        if (condition.test(successResult))
            finalResult.complete(successResult);
    }));
    CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    all.thenRun(() -> {
        finalResult.complete(null);
    });
    return finalResult;
}

您需要支付无操作调用的开销。

您可以根据需要将null更改为某些默认值,或者以不同的方式处理异常(completeexceptionary)。您必须使用whencompletehandle而不是上面的thenaccept来访问exception

陆安国
2023-03-14

您可以使用以下解决方案:

public static <T> CompletableFuture<T> anyMatch(
    List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) {

    CompletableFuture<T> result=new CompletableFuture<>();
    Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); };
    CompletableFuture.allOf(l.stream()
        .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new))
    .whenComplete((ignored, t) ->
        result.completeExceptionally(t!=null? t: new NoSuchElementException()));
    return result;
}

基本原理与Pillar的回答中相同,然而,有一些不同之处:

  • 泛型签名更灵活。
  • 创建CompletableFuture.AllOf所需的数组与注册源Futures的后续操作相结合。作为一个副作用,allof操作的处理程序依赖于完成所有尝试来完成结果,而不是仅依赖于原始的期货。这使得实际所需的依赖关系显式化。这样,当我们用thenAccept替换所有thenAcceptAsync时,它甚至可以工作。
  • 此解决方案以nosuchelementException完成,而不是在没有结果满足条件的情况下返回null。如果至少有一个future异常完成,并且没有匹配结果的成功完成,则中继发生的一个异常。

你可以用

List<CompletableFuture<Integer>> list=Arrays.asList(
    CompletableFuture.supplyAsync(()->5),
    CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }),
    CompletableFuture.supplyAsync(()->42),
    CompletableFuture.completedFuture(0)
);
anyMatch(list, i -> i>9)
    .thenAccept(i->System.out.println("got "+i))
    // optionally chain with:
    .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); });
 类似资料:
  • 我正在用s设计异步调用。这是一个批处理调用,在这里我需要同时处理几个实体。在呼叫结束时,我必须收集关于每一个项目的处理状态的信息。 作为输入,我有这些实体的ID数组。这是一个复杂的实体,为了将一个实体编译成一个对象,我必须发出几个DAO调用。每个DAO方法都返回。 我将那些DAO调用链接起来,因为如果其中一个部分不存在,我将无法构造一个完整的对象。下面是我的代码段的样子: 问题是,由于链接的关系,

  • 我查看了文档并找到了invokeAny ExecutorService,但这将返回任何已成功完成的线程的结果,而不一定是第一个线程。

  • 其中是不可变的,如下所示: 我现在想要一个未来,当其中一个未来: 完成;和 实例的 返回 在继续操作时,我需要有可用的实例。换句话说,我想要一个,它在满足这两个条件时完成。 我怎么能这么做?我知道可以使用在其中一个期货完成时继续执行,但我不确定如何集成第二个需求:必须为true。

  • 我想知道特定月份/年份的第一个星期一的日期。 我所拥有的: 我基本上有两个int变量,一个代表年份,一个代表月份。 我想要的是: 我想知道这个月的第一个星期一,最好是整数或整数值。 例如: 我有2014年和1月,这个月的第一个星期一是6号,所以我想返回6号。 问题: 我原以为我可以用做到这一点,但我在设置日历时遇到了问题,因为只有年份和月份可用。此外,我不知道如何使用返回月份/年份的第一个星期一。

  • 我最近开始学习改装和rxjava。我在寻找关于如何等待和获得第一次观察结果的任何想法。基本上,我想在一个简单的登录上应用它。第一个api调用是获取服务器时间。第二个api调用将等待第一个调用的结果(即服务器时间)并利用它。 现在,我被困在这里了。在第二个可观察对象上,特别是在getNetworks方法上,我想使用我从第一个可观察对象得到的东西。有什么想法吗? 编辑: 我想先处理call 1的结果,

  • 问题内容: 我知道callable的调用可能会将异常抛出给调用它的父方法,而runnable则不是这种情况。 我不知道如何,因为它是线程方法,并且是线程堆栈的最底层方法。 问题答案: 的要点是将异常抛出到调用线程,例如,当您获得提交的结果时。