当前位置: 首页 > 知识库问答 >
问题:

CompletableFuture:运行期货列表、等待结果和处理异常的正确方法

索嘉胜
2023-03-14

我有一个遗留代码,它有十几个数据库调用来填充报表,这需要大量的时间,我试图使用completablefuture来减少这些时间。

我有些怀疑自己做事正确,没有过度使用这项技术。

我的代码现在看起来是这样的:

>

  • 在每个方法中使用多个数据库调用启动文档节的异步填充

    CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
    CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
        ...
    CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
    

    然后,我在arraylist中按特定顺序排列期货,并将所有期货连接起来,以确保只有当所有期货完成时,我的代码才会进一步运行。

    List<CompletableFuture> futures = Arrays.asList(
                section1Future,
                section2Future, ...
                section10Future);
    
    List<Object> futureResults = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    
    Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
    Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
        ...
    Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));
    

    退货单

    我关注的问题是:

    1)以这种方式创建和实例化许多可完成的期货可以吗?在arraylist中按照所需的顺序排列它们,将它们连接起来以确保它们全部完成,然后将它们转换到特定对象中得到结果?

    2)在不指定执行器服务的情况下运行,而依赖于通用的forkjoinpool可以吗?然而,这段代码是在web容器中运行的,所以为了使用JTA,我可能需要通过JNDI使用容器提供的线程池执行器?

    3)如果这段代码被包围在try-catch中,我应该能够在主线程中捕获completionexception,对吗?或者为了做到这一点,我应该声明每个特性,如下所示:

    CompletableFuture.supplyAsync(() -> populateSection1(arguments))
        .exceptionally (ex -> {
                        throw new RuntimeException(ex.getCause());
            });
    

    4)是否有可能过度使用CompletableFuture,从而使它们本身成为性能瓶颈?就像许多futures等待一个执行器开始运行一样?如何避免这种情况?使用容器提供的执行器服务?如果是,请有人给我指出一些最佳实践,如何正确配置执行器服务,考虑到处理器和内存数量?

    5)内存影响。我在并行线程中读到,当许多对象被创建和垃圾收集时,可能会有问题。有没有一个最佳实践来计算应用程序所需的正确内存数量?

  • 共有1个答案

    齐阎宝
    2023-03-14

    这种做法总体上没有错,但也有需要改进的地方。

    最值得注意的是,您不应该使用原始类型,如completablefuture

    populatesection…返回pdfptable时,您应该在整个代码中一致地使用usecompletablefuture

    即。

    CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(()  -> populateSection1(arguments));
    CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(()  -> populateSection2(arguments));
        ...
    CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
    

    即使这些方法没有声明您假定总是在运行时返回的返回类型,您也应该在这个早期阶段插入类型转换:

    CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(()  -> (PdfPTable)populateSection1(arguments));
    CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(()  -> (PdfPTable)populateSection2(arguments));
        ...
    CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));
    
    Stream.of(section1Future, section2Future, ..., section10Future)
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .forEachOrdered(el -> populatePdfElement(document, el));
    

    通过不使用原始类型,您已经获得了所需的结果类型,并且可以在该流操作中执行第三步的操作,即筛选和执行最终操作。

    如果您仍然需要列表,您可以使用

    List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());
    
    results.forEach(el -> populatePdfElement(document, el));
    

    也就是说,并行性取决于用于操作的线程池(指定给supplyAsync)。当您没有指定执行器时,您得到的是并行流使用的默认fork/join池,因此在这个特定的情况下,您得到的结果与

    List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
        ()  -> populateSection1(arguments),
        ()  -> populateSection2(arguments));
        ...
        () -> populateSection10(arguments)))
        .parallel()
        .map(Supplier::get)
        .filter(Objects::nonNull)
        .forEachOrdered(el -> populatePdfElement(document, el));
    
    List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
        ()  -> populateSection1(arguments),
        ()  -> populateSection2(arguments));
        ...
        () -> populateSection10(arguments)))
        .parallel()
        .map(Supplier::get)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());
    
    results.forEach(el -> populatePdfElement(document, el));
    

    关于异常处理,当您调用CompletableFuture::Join时,您将得到包装在CompletionException中的供应商引发的任何异常。在例外情况下(ex->{throw new RuntimeException(ex.getcause());});没有意义,当您调用completablefuture::join时,新的RuntimeException也将包装在completionexception中。

    在Stream变体中,您将在没有包装器的情况下获得异常。由于supplier不允许检查异常,因此只能使用runtimeexceptionerror的子类型。

    其他问题对于问答来说太宽泛了。

     类似资料:
    • 本文向大家介绍PHP ajax 异步执行不等待执行结果的处理方法,包括了PHP ajax 异步执行不等待执行结果的处理方法的使用技巧和注意事项,需要的朋友参考一下 短地址生成应用中,要根据长地址生成网页快照,这个生成时间非瞬发,不可预估。 所以前台方面采用的方案一般为先展示生成的短地址,再定期AJAX轮查网页快照是否生成完毕。 So,PHP代码这里做了如下处理: 前台Js的ajax脚本: 回调函数

    • 我是Scala未来的新手,我还没有找到问题的解决方案。我正在努力实现以下目标(总体描述:努力获取一个酒店列表的客人列表,分别查询每个酒店): < li >对另一个API进行n次调用,每次调用都超时 < li >合并所有结果(将列表转换为包含所有元素的列表) < li >如果单个调用失败,记录错误并返回一个空列表(本质上,在这种情况下,如果我得到部分结果总比没有结果好) < li >理想情况下,如果

    • 如何等待期货列表15分钟(如果未完成)而不是每个未来?下面的代码将为每个未来等待15分钟。但这不是我想要的

    • 假设我有这个示例代码,在中遇到了一个异常。我的问题是,这个异常是否会阻止作为在与此代码的调用者方法相同的线程中运行。 我已经浏览了这个主题,它解释了如何处理从异步块抛出的异常(即通过阻塞和使用)。我想知道如果”,块中的代码是否会被执行。 更新: 我运行了一些代码来测试: 它不会打印任何内容,这意味着异常不会从异步块“传播到/到达”调用方方法的线程。我现在了解这里的预期行为,但我有以下问题: 为什么

    • 我试图了解gRPC中的异常处理机制是如何工作的。 除了try-catch块之外,还有其他方法来处理运行时异常,例如服务器端的IllegalArgumentException吗? 例如,我有一些gRPC流式客户端服务,当传递的参数不满足深层次的一些断言时(在某些库中,例如,com.google.common.base.Preconditions),方法onNext抛出IllegalArgumentE

    • 本文向大家介绍Java中断异常的正确处理方法,包括了Java中断异常的正确处理方法的使用技巧和注意事项,需要的朋友参考一下 处理InterruptedException 这个故事可能很熟悉:你正在写一个测试程序,你需要暂停某个线程一段时间,所以你调用 Thread.sleep()。然后编译器或 IDE 就会抱怨说 InterruptedException 没有抛出声明或捕获。什么是 Interru