当前位置: 首页 > 知识库问答 >
问题:

如何在CompletableFutures中收集成功和错误?

颜高朗
2023-03-14

我想向Web服务并行发送多个请求。结果应该由成功错误收集,然后可以由调用者进一步分析。

public Map.Entry<Rsp, Errors> sendApiRequests(List<Req> reqs) {
    //will mostly remain null as errors won't occur frequently
    List<Rsp> errors = null;

    List<CompletableFuture<Rsp>> futures =
            reqs.stream()
                .map(req -> CompletableFuture.supplyAsync(() -> send(req))
                        .exceptionally(ex -> {
                            //TODO this fails, because list should be final for it.
                            //but don't want to instantiate as mostly will remain just null
                            if (errors == null) errors = new ArrayList<>();
                            errors.add(req);
                        }))
                .collect(Collectors.toList());

    //send api requests in parallel
    List<Rsp> responses = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());

    //TODO how to collect the errors? each error should also provide the underlying Req that caused the failure.
    //pending requests should not be aborted if any throwns an exception
    return new SimpleEntry(responses, errors);
}

问题:我怎样才能收集所有响应,同时也收集在send()方法中抛出的异常

我的目标不是返回两个列表:一个包含所有成功的响应,另一个包含错误。

共有1个答案

柳培
2023-03-14

虽然这是一个老问题,但我需要类似的东西,并且能够从上面的评论和其他不太正确的互联网片段中拼凑出一个可行的解决方案。

方法和CompletableFutureCollector类将为每个请求返回一个响应或错误列表。这在Java11中实现,但应该适用于Java8。我建议将其调整为传入java.util.concurrent.Execator以控制并行度。例如,您可以像这样使用它:

    final List<CompletableFutureCollector.CollectorResult<Rsp>> results =
            sendApiRequests(List.of(new Req()));
    results.stream()
            .filter(CompletableFutureCollector.CollectorResult::hasError)
            .map(CompletableFutureCollector.CollectorResult::getError)
            .forEach(error -> {
                // Do something with errors
            });
    results.stream()
            .filter(CompletableFutureCollector.CollectorResult::hasResult)
            .map(CompletableFutureCollector.CollectorResult::getResult)
            .forEach(rsp -> {
                // Do something with responses
            });
    public List<CompletableFutureCollector.CollectorResult<Rsp>> sendApiRequests(List<Req> reqs) {
        // The actual send implementation could be anything that you'd like to do asynchronously  
        return CompletableFutureCollector.mapAsyncAndCollectResult(reqs, req -> send(req));
    }

    // ...

    private final static class CompletableFutureCollector {
        private CompletableFutureCollector() {
        }

        public static <X, T extends CompletableFuture<X>> Collector<T, ?, CompletableFuture<List<X>>> collectResult() {
            return Collectors.collectingAndThen(Collectors.toList(), joinResult());
        }

        private static <X, T extends CompletableFuture<X>> Function<List<T>, CompletableFuture<List<X>>> joinResult() {
            return futures -> allOf(futures)
                    .thenApply(v -> futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));
        }

        private static <T extends CompletableFuture<?>> CompletableFuture<Void> allOf(final List<T> futures) {
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        }

        public static <T, R> List<CompletableFutureCollector.CollectorResult<R>> mapAsyncAndCollectResult(
                final Collection<T> items, final Function<T, R> action) {
            return items.parallelStream()
                    .map(task -> CompletableFuture.supplyAsync(() -> action.apply(task)))
                    .map(CompletableFutureCollector.CollectorResult::handle)
                    .collect(CompletableFutureCollector.collectResult())
                    .join();
        }

        private final static class CollectorResult<R> {
            private final R result;
            private final Throwable error;

            private CollectorResult(final R result, final Throwable error) {
                this.result = result;
                this.error = error;
            }

            public boolean hasError() {
                return getError() != null;
            }

            public boolean hasResult() {
                return !hasError();
            }

            public R getResult() {
                return result;
            }

            public Throwable getError() {
                return error instanceof CompletionException ? error.getCause() : error;
            }

            public static <R> CompletableFuture<CompletableFutureCollector.CollectorResult<R>> handle(final CompletableFuture<R> future) {
                return future.handle(CompletableFutureCollector.CollectorResult::new);
            }
        }
    }

注意:我还没有完全测试这个解决方案,因为它是从一个有一些额外的快速故障消除逻辑的工作实现中改编的。

 类似资料:
  • 我对此进行了研究,发现了许多集成CKEditor的方法,但我在集成CKFinder时遇到了问题。我的网站是由其他人创建的,但我觉得我可以自己集成这个。 该网站包含文章,我有一个使用CKEditor的“视图”。它工作得很好,但我无法将图像上传到服务器或浏览服务器图像以在CKEditor中插入照片。因此,我无法在文章文本中添加照片。我希望CKFinder能解决这个问题。 我已经上传了CKFinder文

  • 问题内容: 如何在视图中收集错误? 我不想使用HTML Helper验证摘要或验证消息。相反,我想检查是否有错误,是否有错误以特定格式显示。另外,在输入控件上,我想检查特定的属性错误并将类添加到输入中。 PS我正在使用Spark View Engine,但想法应该是相同的。 所以我想我可以做… 或类似的东西。 更新 我的最终解决方案如下所示: 如果此属性有错误,则仅添加错误CSS类。 问题答案:

  • 假设我的配置单元表包含以下值: 我正在使用。我在collect_list/collect_set或group_concat查询后出现此错误。 错误:org。阿帕奇。蜂箱服务cli。HiveSQLException:处理语句时出错:失败:执行错误,从组织返回代码2。阿帕奇。hadoop。蜂箱ql.exec。org的MapRedTask先生。阿帕奇。蜂箱服务cli。活动活动toSQLException

  • null 如何在transform()步骤中添加Jaxb2Marshaller?

  • 我有build.xml专门用于为我的项目运行junit测试用例现在我必须将它与jacoco代码覆盖集成,我已经将jacaco ant.jar放在ant home libe路径中,现在你能告诉我如何集成或者如何将下面的目标更改为与jacoco相同吗?我的目标如下所示…

  • 我有一个框架,它是用来创建的核心Java+TestNG。然后是TDD模型,POM是我们的构建管理工具。有人能告诉我是否有可能使用Cucumber将框架从TDD更新到BDD。但是我仍然希望对此需求做最小的更改,比如不更改任何现有的技术(核心语言、TestNG、Maven Sys.)。我的目标是如何在Eclipse中使用testng.xml/testng插件运行cucumber TC。在Jenkin