我正在为给定的主键查询Dynamo DB。主键由两个UUID字段(fieldUUID1、fieldUUID2)组成。对于上面的主键组合和值列表,我有很多查询要执行。为此,我使用异步CompletableFuture和ExecutorService,线程池大小为4。
在所有查询返回结果之后,这是CompletableFuture
如果某些查询导致结果分页,即返回
lastEvaluatedKey
,则我无法知道哪个查询请求返回了此结果。
如果我做了一个. get()调用,而我收到了完全未来,这将是一个阻塞操作,这违背了使用异步的目的。有没有办法让我处理这种情况?
例如:
我可以尝试使用Compose方法,但是当lastEvaluatedKey不存在时,我如何知道需要在什么时候停止。
for (final QueryRequest queryRequest : queryRequests) {
final CompletableFuture<QueryResult> futureResult =
CompletableFuture.supplyAsync(() ->
dynamoDBClient.query(queryRequest), executorService));
if (futureResult == null) {
continue;
}
futures.add(futureResult);
}
// Wait for completion of all of the Futures provided
final CompletableFuture<Void> allfuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
// The return type of the CompletableFuture.allOf() is a
// CompletableFuture<Void>. The limitation of this method is that it does not
// return the combined results of all Futures. Instead we have to manually get
// results from Futures. CompletableFuture.join() method and Java 8 Streams API
// makes it simple:
final CompletableFuture<List<QueryResult>> allFutureList = allfuture.thenApply(val -> {
return futures.stream().map(f -> f.join()).collect(Collectors.toList());
});
final List<QueryOutcome> completableResults = new ArrayList<>();
try {
try {
// at this point all the Futures should be done, because we already executed
// CompletableFuture.allOf method.
final List<QueryResult> returnedResult = allFutureList.get();
for (final QueryResult queryResult : returnedResult) {
if (MapUtils.isNotEmpty(queryResult.getLastEvaluatedKey()) {
// how to get hold of original request and include last evaluated key ?
}
}
} finally {
}
} finally {
}
我可以依赖. get()方法,但它将是一个阻塞调用。
满足您需求的快速解决方案是更改您的未来
列表。而不是让它存储CompletableFuture
然后,一旦
allfuture
完成,您就可以迭代futures
并访问请求和结果。
然而,这里有一个更深层次的问题。一旦您访问了原始
查询请求
,您打算做什么?我猜您希望发出一个后续请求,并将exclusiveStartKey
设置为响应的lastEvaluatedKey
所包含的任何值。这意味着您将等待所有原始查询完成,然后才发布下一批查询。这是低效的:如果查询返回了lastEvaluatedKey
,您希望尽快发出后续查询。
为了实现这一点,我建议你引入一个新方法,它只需要一个
QueryRequest
对象,并返回一个CompletableFuture
用给定的请求发出查询
- 一旦结果到达,检查它。如果它的
lastEvalue atedKey
是空的,返回它作为方法的结果 - 否则,更新request.exclusiveStartKey并返回第一步。
是的,使用
CompletableFutures
(与阻塞代码相比)要做到这一点有点困难,但完全可行。
一旦你有了这个方法,你的代码需要对
query请求
中的每个请求调用这个方法一次,把返回的CompletableFuture
放在一个列表中,并在上面做一个CompletableFuture.allOf()
那个名单。一旦所有的未来完成,你可以只使用结果-不需要做问题后续查询。
我使用的是SpringBoot和SpringDataJPA,我有一个逻辑,它由3个数据库请求组成,我想并行运行。我想在将来用于此目的。 最后,我需要从5 db查询运行的结果中构建响应对象。 所以我创造了完全未来 那我打算用。与这个未来无关。但我对循环调用有问题。如何重写它以在每个请求中使用callable,我需要从请求中传递值,然后按键排序到map中?
我有两个不同的CompletableFuture,我喜欢并行运行(每个都有不同的返回类型),然后结合它们的结果: 现在我喜欢把dog.name 我想用 但我在这里很烂。
我试图用async CompletableFuture创建一个简单的示例,但我看到了一些奇怪的行为。我的想法是启动两个异步未来,一个在设定时间后激活布尔标志,另一个轮询该标志,在线程1更改该标志后释放该值。这是我的代码: 而CF类: 当我让程序运行它的课程时,它会打印以下内容: 获取可完成的 开始睡觉 睡过了 进程已完成,退出代码为0 i、 e.未来永远不会在分配的10秒内完成。这是怎么回事?
我正在为以下问题而苦苦挣扎:我有一个返回Future[Result]的方法,其中Result是我想用specs2中的数据表行检查的东西。 据我所知,以下代码每次都会阻塞并等待结果可用。 通常,我想异步进行所有调用,然后使用Future.sequence将Seq[Future[Result]]转换为Future[Seq[Result]],然后运行测试。 有什么合理的方法可以做到这一点吗?
我用谷歌搜索了这个,但仍然无法得到一个坚实的理解。我找不到任何使用构造函数的特定示例 Java博士说 未来提交(可运行任务,T结果) 提交一个可运行任务以供执行,并返回一个表示该任务的未来。Future 的 get 方法将在成功完成后返回给定的结果。 看到这一点,我的理解是在任务完成后的未来任务。get(),它将返回传递的给定结果对象,该对象与“可运行”作业无关。这是一种“可运行”作业完成的信号。
但是,testCase2不处理异常并引发错误。我是不是漏掉了什么?抱歉,我是新手。