我正在用completablefuture
s设计异步调用。这是一个批处理调用,在这里我需要同时处理几个实体。在呼叫结束时,我必须收集关于每一个项目的处理状态的信息。
作为输入,我有这些实体的ID数组。这是一个复杂的实体,为了将一个实体编译成一个对象,我必须发出几个DAO调用。每个DAO方法都返回completablefuture
。
我将那些DAO调用链接起来,因为如果其中一个部分不存在,我将无法构造一个完整的对象。下面是我的代码段的样子:
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
public class CfChainsAllOfTest {
private DAO dao = new DAO();
public static void main(String[] args) {
CompletableFuture<Void> resultPrintingCf = new CfChainsAllOfTest().fetchAllInParallelAndCollect(Lists.newArrayList(1l, 2l, 3l)).thenAccept(results -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + results);
});
resultPrintingCf.join();
}
private CompletableFuture<List<Item>> fetchAllInParallelAndCollect(List<Long> ids) {
List<CompletableFuture<Item>> cfs = Lists.newArrayList();
for (Long id : ids) {
// I want this to be an instant non-blocking operation
cfs.add(fetchSingle(id));
System.out.println("[" + Thread.currentThread().getName() + "]" + "After completable future was added to the list, id=" + id);
}
return waitAllOfAndCollect(cfs);
}
private CompletableFuture<Item> fetchSingle(Long id) {
return getPartCAndSetOnItem(new Item(id)).thenCompose(this::getPartBAndSetOnItem).thenCompose(this::getPartAAndSetOnItem);
}
public CompletableFuture<Item> getPartCAndSetOnItem(Item item) {
return dao.getPartC(item.getId()).thenCompose(partC -> {
CompletableFuture<Item> cf = new CompletableFuture<>();
item.setPartC(partC);
cf.complete(item);
return cf;
});
}
public CompletableFuture<Item> getPartBAndSetOnItem(Item item) {
return dao.getPartB(item.getId()).thenCompose(partB -> {
CompletableFuture<Item> cf = new CompletableFuture<>();
item.setPartB(partB);
cf.complete(item);
return cf;
});
}
public CompletableFuture<Item> getPartAAndSetOnItem(Item item) {
return dao.getPartA(item.getId()).thenCompose(partA -> {
CompletableFuture<Item> cf = new CompletableFuture<>();
item.setPartA(partA);
cf.complete(item);
return cf;
});
}
private static <T> CompletableFuture<List<T>> waitAllOfAndCollect(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(future -> future.join()).collect(Collectors.<T> toList()));
}
static class DAO {
public CompletableFuture<PartC> getPartC(Long id) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part C from database for id=" + id);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("[" + Thread.currentThread().getName() + "]" + "Part C fetched from db for id=" + id);
return new PartC();
});
}
public CompletableFuture<PartB> getPartB(Long id) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part B from database for id=" + id);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("[" + Thread.currentThread().getName() + "]" + "Part B fetched from db for id=" + id);
return new PartB();
});
}
public CompletableFuture<PartA> getPartA(Long id) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + "Fetching Part A from database for id=" + id);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("[" + Thread.currentThread().getName() + "]" + "Part A fetched from db for id=" + id);
return new PartA();
});
}
}
static class Item {
private final Long id;
private PartA partA;
private PartB partB;
private PartC partC;
public Item(Long id) {
this.id = id;
}
public Long getId() {
return id;
}
public PartA getPartA() {
return partA;
}
public void setPartA(PartA partA) {
this.partA = partA;
}
public PartB getPartB() {
return partB;
}
public void setPartB(PartB partB) {
this.partB = partB;
}
public PartC getPartC() {
return partC;
}
public void setPartC(PartC partC) {
this.partC = partC;
}
@Override
public String toString() {
return "Item [id=" + id + ", partA=" + partA + ", partB=" + partB + ", partC=" + partC + "]";
}
}
static class PartA {
@Override
public String toString() {
return "Part A";
}
}
static class PartB {
@Override
public String toString() {
return "Part B";
}
}
static class PartC {
@Override
public String toString() {
return "Part C";
}
}
}
问题是,由于链接的关系,每个项目的处理并不是真正并行完成的。看起来completablefuture
s的链接是一个阻塞调用。我希望CFs链立即返回completablefuture
变量,并且只有在开始计算值之后才返回。
这就是说,什么是实现这种行为的最佳方式呢?多谢了。
这个方法的问题是:
private CompletableFuture<Item> fetchSingle(Long id) {
return getPartCAndSetOnItem(new Item(id)).thenCompose(this::getPartBAndSetOnItem).thenCompose(this::getPartAAndSetOnItem);
}
基本上你是在说:得到C部分,然后得到B部分,然后得到A部分。
相反,您应该调用这3个方法,然后合并结果--尽管由于将结果存储在item
上的方式,在这里可能没有必要这样做(注意这里的Java内存模型,因为您的item
在这里没有同步:对于更复杂的示例,它可能无法正常工作)。
所以,基本上:
private CompletableFuture<Item> fetchSingle(Long id) {
Item result = new Item(id);
CompletableFuture<?> c = getPartCAndSetOnItem(result);
CompletableFuture<?> b = getPartBAndSetOnItem(result);
CompletableFuture<?> a = getPartAAndSetOnItem(result);
return CompletableFuture.allOf(a, b, c).thenApply(__ -> result);
}
当然,缺点是,即使一个呼叫失败,你也要执行全部3个呼叫,但你不能鱼与熊掌兼得…
另外,您的GetPartXandSetonItem()
方法可以简化为
public CompletableFuture<Item> getPartXAndSetOnItem(Item item) {
return dao.getPartX(item.getId()).thenApply(partX -> {
item.setPartX(partX);
return item;
});
}
或者,考虑到我们不关心fetchsingle()
中的实际结果类型:
public CompletableFuture<?> getPartXAndSetOnItem(Item item) {
return dao.getPartX(item.getId()).thenRun(item::setPartX);
}
我有一个将一个项目映射到它的属性,例如,其中属性是从不同的数据源检索的。 例如,我们从数据库中获得属性,而从Solr中获得属性。 当我最初从DB检索时,我使用以避免阻塞主线程,如下所示: 然后通过对Solr的异步调用将其链接起来,这样我最终将有一个异步Hashmap将项映射到它们的属性,即(因此我循环遍历Hashmap的键,并用新属性更新值,使用访问旧属性)。 最后,我将数据映射到csv,这就是问
通常,对于CompletableFuture,我会调用thenApply或它的其他方法,以便在结果可用时立即执行某些操作。然而,我现在有一种情况,我想处理结果,直到我收到一个肯定的结果,然后忽略所有进一步的结果。 如果我只是想获取第一个可用的结果,我可以使用CompletableFuture.anyOf(尽管我讨厌为了调用anyOf而将列表转换为数组)。但那不是我想要的。我想取第一个结果,如果它没
我在某个存储库类上有一个方法,它返回。完成这些期货的代码使用一个第三方库来阻止。我打算有一个单独的有界,这个存储库类将使用它来进行这些阻塞调用。 这里有一个例子: 我的应用程序的其余部分将组成这些期货,并用结果做一些其他的事情。当提供给、、等的其他函数时,我不希望它们在存储库的上运行。 另一个例子: JavaDoc声明: 为非异步方法的从属完成提供的操作可以由完成当前CompletableFutu
现在,当我们知道提交给的工作是一个长时间运行的操作时,我们需要传递一个自定义执行器(否则默认情况下它将在上执行)。 每个控制器执行都有一个上下文,其中包含所有请求信息。如果使用,······ 如果我们只是创建自定义的并将其注入到控制器中,以便在中使用,那么我们将不会拥有所有的上下文信息。 下面是返回的某个控制器操作的示例 } 如果我们尝试在中运行类似的内容 使用CompletableFuture和
我现在有点困惑,所以我有一个方法应该返回
我想向Web服务并行发送多个请求。结果应该由成功错误收集,然后可以由调用者进一步分析。 问题:我怎样才能收集所有响应,同时也收集在方法中抛出的异常? 我的目标不是返回两个列表:一个包含所有成功的响应,另一个包含错误。