当前位置: 首页 > 知识库问答 >
问题:

Dynamo DB中的分页结果具有完全的未来

夹谷浩博
2023-03-14

我正在为给定的主键查询Dynamo DB。主键由两个UUID字段(fieldUUID1、fieldUUID2)组成。对于上面的主键组合和值列表,我有很多查询要执行。为此,我使用异步CompletableFuture和ExecutorService,线程池大小为4。

在所有查询返回结果之后,这是CompletableFuture

如果某些查询导致结果分页,即返回lastEvaluatedKey,则我无法知道哪个查询请求返回了此结果。

如果我做了一个. get()调用,而我收到了完全未来,这将是一个阻塞操作,这违背了使用异步的目的。有没有办法让我处理这种情况?

例如:

我可以尝试使用Compose方法,但是当lastEvaluatedKey不存在时,我如何知道需要在什么时候停止。

for (final QueryRequest queryRequest : queryRequests) {
    final CompletableFuture<QueryResult> futureResult =
        CompletableFuture.supplyAsync(() ->
            dynamoDBClient.query(queryRequest), executorService));

    if (futureResult == null) {
        continue;
    }

    futures.add(futureResult);
}

// Wait for completion of all of the Futures provided
final CompletableFuture<Void> allfuture = CompletableFuture
    .allOf(futures.toArray(new CompletableFuture[futures.size()]));

// The return type of the CompletableFuture.allOf() is a
// CompletableFuture<Void>. The limitation of this method is that it does not
// return the combined results of all Futures. Instead we have to manually get
// results from Futures. CompletableFuture.join() method and Java 8 Streams API
// makes it simple:
final CompletableFuture<List<QueryResult>> allFutureList = allfuture.thenApply(val -> {
    return futures.stream().map(f -> f.join()).collect(Collectors.toList());
});


final List<QueryOutcome> completableResults = new ArrayList<>();
try {
    try {
        // at this point all the Futures should be done, because we already executed
        // CompletableFuture.allOf method.
        final List<QueryResult> returnedResult = allFutureList.get();
        for (final QueryResult queryResult : returnedResult) {
            if (MapUtils.isNotEmpty(queryResult.getLastEvaluatedKey()) {
                // how to get hold of original request  and include last evaluated key ?
            }
        }
    } finally {

    }
} finally {

}

我可以依赖. get()方法,但它将是一个阻塞调用。


共有1个答案

璩涛
2023-03-14

满足您需求的快速解决方案是更改您的未来列表。而不是让它存储CompletableFuture

然后,一旦allfuture完成,您就可以迭代futures并访问请求和结果。

然而,这里有一个更深层次的问题。一旦您访问了原始查询请求,您打算做什么?我猜您希望发出一个后续请求,并将exclusiveStartKey设置为响应的lastEvaluatedKey所包含的任何值。这意味着您将等待所有原始查询完成,然后才发布下一批查询。这是低效的:如果查询返回了lastEvaluatedKey,您希望尽快发出后续查询。

为了实现这一点,我建议你引入一个新方法,它只需要一个QueryRequest对象,并返回一个CompletableFuture

  • 用给定的请求发出查询
  • 一旦结果到达,检查它。如果它的lastEvalue atedKey是空的,返回它作为方法的结果
  • 否则,更新request.exclusiveStartKey并返回第一步。

是的,使用CompletableFutures(与阻塞代码相比)要做到这一点有点困难,但完全可行。

一旦你有了这个方法,你的代码需要对query请求中的每个请求调用这个方法一次,把返回的CompletableFuture放在一个列表中,并在上面做一个CompletableFuture.allOf()那个名单。一旦所有的未来完成,你可以只使用结果-不需要做问题后续查询。

 类似资料:
  • 我使用的是SpringBoot和SpringDataJPA,我有一个逻辑,它由3个数据库请求组成,我想并行运行。我想在将来用于此目的。 最后,我需要从5 db查询运行的结果中构建响应对象。 所以我创造了完全未来 那我打算用。与这个未来无关。但我对循环调用有问题。如何重写它以在每个请求中使用callable,我需要从请求中传递值,然后按键排序到map中?

  • 我有两个不同的CompletableFuture,我喜欢并行运行(每个都有不同的返回类型),然后结合它们的结果: 现在我喜欢把dog.name 我想用 但我在这里很烂。

  • 我试图用async CompletableFuture创建一个简单的示例,但我看到了一些奇怪的行为。我的想法是启动两个异步未来,一个在设定时间后激活布尔标志,另一个轮询该标志,在线程1更改该标志后释放该值。这是我的代码: 而CF类: 当我让程序运行它的课程时,它会打印以下内容: 获取可完成的 开始睡觉 睡过了 进程已完成,退出代码为0 i、 e.未来永远不会在分配的10秒内完成。这是怎么回事?

  • 我正在为以下问题而苦苦挣扎:我有一个返回Future[Result]的方法,其中Result是我想用specs2中的数据表行检查的东西。 据我所知,以下代码每次都会阻塞并等待结果可用。 通常,我想异步进行所有调用,然后使用Future.sequence将Seq[Future[Result]]转换为Future[Seq[Result]],然后运行测试。 有什么合理的方法可以做到这一点吗?

  • 我用谷歌搜索了这个,但仍然无法得到一个坚实的理解。我找不到任何使用构造函数的特定示例 Java博士说 未来提交(可运行任务,T结果) 提交一个可运行任务以供执行,并返回一个表示该任务的未来。Future 的 get 方法将在成功完成后返回给定的结果。 看到这一点,我的理解是在任务完成后的未来任务。get(),它将返回传递的给定结果对象,该对象与“可运行”作业无关。这是一种“可运行”作业完成的信号。

  • 但是,testCase2不处理异常并引发错误。我是不是漏掉了什么?抱歉,我是新手。