当前位置: 首页 > 知识库问答 >
问题:

CompletableFutures:并行处理CompletableFutures链

宇文智敏
2023-03-14

我正在用completablefutures设计异步调用。这是一个批处理调用,在这里我需要同时处理几个实体。在呼叫结束时,我必须收集关于每一个项目的处理状态的信息。

作为输入,我有这些实体的ID数组。这是一个复杂的实体,为了将一个实体编译成一个对象,我必须发出几个DAO调用。每个DAO方法都返回completablefuture

我将那些DAO调用链接起来,因为如果其中一个部分不存在,我将无法构造一个完整的对象。下面是我的代码段的样子:

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class CfChainsAllOfTest {

    private DAO dao = new DAO();

    public static void main(String[] args) {
        CompletableFuture<Void> resultPrintingCf = new CfChainsAllOfTest().fetchAllInParallelAndCollect(Lists.newArrayList(1l, 2l, 3l)).thenAccept(results -> {
            System.out.println("[" + Thread.currentThread().getName() + "]" + results);
        });
        resultPrintingCf.join();
    }

    private CompletableFuture<List<Item>> fetchAllInParallelAndCollect(List<Long> ids) {
        List<CompletableFuture<Item>> cfs = Lists.newArrayList();
        for (Long id : ids) {
            // I want this to be an instant non-blocking operation
            cfs.add(fetchSingle(id));
            System.out.println("[" + Thread.currentThread().getName() + "]" + "After completable future was added to the list, id=" + id);
        }
        return waitAllOfAndCollect(cfs);
    }

    private CompletableFuture<Item> fetchSingle(Long id) {
        return getPartCAndSetOnItem(new Item(id)).thenCompose(this::getPartBAndSetOnItem).thenCompose(this::getPartAAndSetOnItem);
    }

    public CompletableFuture<Item> getPartCAndSetOnItem(Item item) {
        return dao.getPartC(item.getId()).thenCompose(partC -> {
            CompletableFuture<Item> cf = new CompletableFuture<>();
            item.setPartC(partC);
            cf.complete(item);
            return cf;
        });
    }

    public CompletableFuture<Item> getPartBAndSetOnItem(Item item) {
        return dao.getPartB(item.getId()).thenCompose(partB -> {
            CompletableFuture<Item> cf = new CompletableFuture<>();
            item.setPartB(partB);
            cf.complete(item);
            return cf;
        });
    }

    public CompletableFuture<Item> getPartAAndSetOnItem(Item item) {
        return dao.getPartA(item.getId()).thenCompose(partA -> {
            CompletableFuture<Item> cf = new CompletableFuture<>();
            item.setPartA(partA);
            cf.complete(item);
            return cf;
        });
    }

    private static <T> CompletableFuture<List<T>> waitAllOfAndCollect(List<CompletableFuture<T>> futures) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v -> futures.stream().map(future -> future.join()).collect(Collectors.<T> toList()));
    }

    static class DAO {

        public CompletableFuture<PartC> getPartC(Long id) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part C from database for id=" + id);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Part C fetched from db for id=" + id);
                return new PartC();
            });
        }

        public CompletableFuture<PartB> getPartB(Long id) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part B from database for id=" + id);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Part B fetched from db for id=" + id);
                return new PartB();
            });
        }

        public CompletableFuture<PartA> getPartA(Long id) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part A from database for id=" + id);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
                System.out.println("[" + Thread.currentThread().getName() + "]" + "Part A fetched from db  for id=" + id);
                return new PartA();
            });
        }

    }

    static class Item {

        private final Long id;

        private PartA partA;
        private PartB partB;
        private PartC partC;

        public Item(Long id) {
            this.id = id;
        }

        public Long getId() {
            return id;
        }

        public PartA getPartA() {
            return partA;
        }

        public void setPartA(PartA partA) {
            this.partA = partA;
        }

        public PartB getPartB() {
            return partB;
        }

        public void setPartB(PartB partB) {
            this.partB = partB;
        }

        public PartC getPartC() {
            return partC;
        }

        public void setPartC(PartC partC) {
            this.partC = partC;
        }

        @Override
        public String toString() {
            return "Item [id=" + id + ", partA=" + partA + ", partB=" + partB + ", partC=" + partC + "]";
        }

    }

    static class PartA {
        @Override
        public String toString() {
            return "Part A";
        }

    }

    static class PartB {
        @Override
        public String toString() {
            return "Part B";
        }
    }

    static class PartC {
        @Override
        public String toString() {
            return "Part C";
        }
    }

}

问题是,由于链接的关系,每个项目的处理并不是真正并行完成的。看起来completablefutures的链接是一个阻塞调用。我希望CFs链立即返回completablefuture 变量,并且只有在开始计算值之后才返回。

这就是说,什么是实现这种行为的最佳方式呢?多谢了。

共有1个答案

戎元忠
2023-03-14

这个方法的问题是:

private CompletableFuture<Item> fetchSingle(Long id) {
    return getPartCAndSetOnItem(new Item(id)).thenCompose(this::getPartBAndSetOnItem).thenCompose(this::getPartAAndSetOnItem);
}

基本上你是在说:得到C部分,然后得到B部分,然后得到A部分。

相反,您应该调用这3个方法,然后合并结果--尽管由于将结果存储在item上的方式,在这里可能没有必要这样做(注意这里的Java内存模型,因为您的item在这里没有同步:对于更复杂的示例,它可能无法正常工作)。

所以,基本上:

private CompletableFuture<Item> fetchSingle(Long id) {
    Item result = new Item(id);
    CompletableFuture<?> c = getPartCAndSetOnItem(result);
    CompletableFuture<?> b = getPartBAndSetOnItem(result);
    CompletableFuture<?> a = getPartAAndSetOnItem(result);
    return CompletableFuture.allOf(a, b, c).thenApply(__ -> result);
}

当然,缺点是,即使一个呼叫失败,你也要执行全部3个呼叫,但你不能鱼与熊掌兼得…

另外,您的GetPartXandSetonItem()方法可以简化为

public CompletableFuture<Item> getPartXAndSetOnItem(Item item) {
    return dao.getPartX(item.getId()).thenApply(partX -> {
        item.setPartX(partX);
        return item;
    });
}

或者,考虑到我们不关心fetchsingle()中的实际结果类型:

public CompletableFuture<?> getPartXAndSetOnItem(Item item) {
    return dao.getPartX(item.getId()).thenRun(item::setPartX);
}
 类似资料:
  • 我有一个将一个项目映射到它的属性,例如,其中属性是从不同的数据源检索的。 例如,我们从数据库中获得属性,而从Solr中获得属性。 当我最初从DB检索时,我使用以避免阻塞主线程,如下所示: 然后通过对Solr的异步调用将其链接起来,这样我最终将有一个异步Hashmap将项映射到它们的属性,即(因此我循环遍历Hashmap的键,并用新属性更新值,使用访问旧属性)。 最后,我将数据映射到csv,这就是问

  • 通常,对于CompletableFuture,我会调用thenApply或它的其他方法,以便在结果可用时立即执行某些操作。然而,我现在有一种情况,我想处理结果,直到我收到一个肯定的结果,然后忽略所有进一步的结果。 如果我只是想获取第一个可用的结果,我可以使用CompletableFuture.anyOf(尽管我讨厌为了调用anyOf而将列表转换为数组)。但那不是我想要的。我想取第一个结果,如果它没

  • 我在某个存储库类上有一个方法,它返回。完成这些期货的代码使用一个第三方库来阻止。我打算有一个单独的有界,这个存储库类将使用它来进行这些阻塞调用。 这里有一个例子: 我的应用程序的其余部分将组成这些期货,并用结果做一些其他的事情。当提供给、、等的其他函数时,我不希望它们在存储库的上运行。 另一个例子: JavaDoc声明: 为非异步方法的从属完成提供的操作可以由完成当前CompletableFutu

  • 现在,当我们知道提交给的工作是一个长时间运行的操作时,我们需要传递一个自定义执行器(否则默认情况下它将在上执行)。 每个控制器执行都有一个上下文,其中包含所有请求信息。如果使用,······ 如果我们只是创建自定义的并将其注入到控制器中,以便在中使用,那么我们将不会拥有所有的上下文信息。 下面是返回的某个控制器操作的示例 } 如果我们尝试在中运行类似的内容 使用CompletableFuture和

  • 我现在有点困惑,所以我有一个方法应该返回

  • 我想向Web服务并行发送多个请求。结果应该由成功错误收集,然后可以由调用者进一步分析。 问题:我怎样才能收集所有响应,同时也收集在方法中抛出的异常? 我的目标不是返回两个列表:一个包含所有成功的响应,另一个包含错误。