当前位置: 首页 > 知识库问答 >
问题:

Java-CompletableFutures-如果有例外,我如何取消所有的futures

高夜洛
2023-03-14

我有一个方法(包含在下面)来返回completablefuture列表的值。

该方法应该是:

  1. 能够在给定时间后超时。
  2. 如果例外情况超过n个,则可以取消所有期货。

第一个点工作得很好,在它超过超时限制后确实会弹出。(之后我仍然需要调用exectuorservice.shutdownNow()以返回给调用方)。我遇到的问题是我要完成的第二件事。

假设我有一个20,000个期货的列表,所有的期货都有一个例外,那么为什么要让所有的期货都执行,如果我看到有太多的例外,那么我假设所有的期货都有问题,我想取消它们。

此外,我希望有一个超时的每一个未来的个人可能需要多长时间,但这也不会工作,谦虚的原因概述如下。

原因似乎是,当我调用AllDoNefuture.ThenApply()时,此时它会等待并让所有的未来完成,或者成功完成,或者异常完成。只有在所有这些都完成之后,它才会穿越每一个未来,并获得它的结果。在这一点上,取消它有什么好处,当他们已经完成。

如果有人能告诉我如何完成这个特定的需求,我将非常感激:“监视异常和个别超时,并在此基础上取消所有其他超时”。

多谢了。

下面是我写的方法:

/**
     * @param futures a list of completable futures
     * @param timeout how long to allow the futures to run before throwing exception
     * @param timeUnit unit of timeout
     * @param allowedExceptions how many of the futures do we tolerate exceptions,
     * NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
     * */
    public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[futures.size()]));
        try {
            AtomicInteger exceptionCount = new AtomicInteger(0);
            return allDoneFuture.thenApply(v ->//when all are done
                    futures.stream().
                            html" target="_blank">map(future -> {
                                try {
                                    //if only I could set an individual timeout
                                    return future.get(timeout, timeUnit);
                                } catch (Exception e) {
                                    future.cancel(true);
                                    int curExceptionCnt = exceptionCount.incrementAndGet();
                                    if(curExceptionCnt >= allowedExceptions){
                                        //I would've hoped that it will throw it to the calling try-catch 
                                        //and then cancel all futures, but it doesn't
                                        throw new RuntimeException(e);
                                    }
                                    else{
                                        return null;
                                    }
                                }
                            }).
                            collect(Collectors.<T>toList())
            ).get(timeout, timeUnit);
        } catch (Exception e) {
            allDoneFuture.cancel(true);
            throw new RuntimeException(e);
        }
    }

共有1个答案

束俊材
2023-03-14

要在一定数量的异常之后取消所有剩余的期货,您可以对每个异常调用exceptionary,增加异常计数,并可能在其中取消它们。

对于单独的超时,您可以创建一个类,使用它的超时来保存未来,然后根据超时对它们进行排序,并调用get并使用超时减去经过的时间。

static class FutureWithTimeout<T> {
    CompletableFuture<T> f;
    long timeout;
    TimeUnit timeUnit;

    FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
        this.f = f;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }
}
public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
    AtomicInteger exceptionCount = new AtomicInteger(0);
    futures.forEach(f -> f.f.exceptionally(t -> {
        if(exceptionCount.getAndIncrement() == allowedExceptions){
            futures.forEach(c -> c.f.cancel(false));
        }
        return null;
    }));
    long t = System.nanoTime();
    return futures.stream()
        .sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
        .map(f -> {
            try {
                return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)), 
                    TimeUnit.NANOSECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException ex) {
                f.f.cancel(false);
                return null;
            }
        })
        .collect(Collectors.toList());
}

请注意,这可能会以与传入的顺序不同的顺序返回列表。如果您需要相同的顺序,那么您可以将map().collection()更改为foreachordered,然后在不排序的情况下将它们重新映射到它们的结果中。

此外,cancelmayinterruptifrunning参数completablefuture没有影响,因此我将其更改为false。

 类似资料:
  • 问题内容: 我有 如果正在使用entitymanager,如何检索会话?如何从分离标准中获取结果? 问题答案: 为了完全详尽无遗,如果您使用的是JPA 1.0或JPA 2.0实现,则情况有所不同。 JPA 1.0 对于JPA 1.0,您必须使用。但是请记住, 此方法的结果是特定 于 实现的, 即从使用Hibernate的应用程序服务器到其他服务器之间不可移植。例如,使用JBoss,您可以执行以下操

  • 我想构建一个简单的消费者程序(java ),以获取ActiveMQ主题中存储的所有消息。我有一个在队列中发送文本消息的生成器。 但我不知道如何开始写我的消费者来检索旧消息并等待新消息。 如果你有一个例子,谢谢! 这是我的制片人:http://pastebin.com/uRy9D8mY 这是我的消费者:http://pastebin.com/bZh4r66e 当我先于消费者运行生产者,然后运行消费者

  • 问题内容: 我遇到了这个问题,我不知道需要迭代其实际值的实际类型。 任何想法如何从enumValue中提取其可能的值? 问题答案:

  • 如何取消/中止angular 4中所有挂起的HTTP请求。 有一个取消HTTP请求的方法,但是如何一次取消所有挂起的请求。 尤其是在改变路线的时候。 我做了一件事 但是如何在全球范围内实现这一点 有什么想法吗?

  • 问题内容: 是否有一种快速的方法可以从中的所有表获取所有列名,而不必列出所有表? 问题答案:

  • 问题内容: 我今天一直在测试Javascript CSS函数,并注意到当使用.style.cssText时,它仅提供了我用JS设置的CSS。 相反,我想获取元素的所有CSS,所以我猜我在做错什么,或者可能需要另一个函数,例如getComputedStyle,但要使用整个CSS而不是单个属性值,但是我找不到搜索时需要的东西。 所以我的问题是如何从代码的最后部分获得完整的CSS,例如: 而不是当前输出