当前位置: 首页 > 知识库问答 >
问题:

如何在连锁的未来展开?

罗安和
2023-03-14

我想链一个完整的未来,使它在处理过程中扇出。我的意思是,我对一个列表有一个开放的、可完成的未来,我想对列表中的每一项进行计算。

第一步是调用m_myApi.get响应(请求,执行器)发出异步调用。

该异步调用的结果有一个getCandidates方法。我想并行分析所有这些候选者。

目前,我的代码以串行方式解析它们

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .map(MyParser::ParseCandidates)
                                                   .collect(Collectors.toList()));
}

我想要这样的东西:

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .PARSE_IN_PARALLEL_USING_EXECUTOR
}

共有2个答案

隆礼骞
2023-03-14

已有一个版本的thenApplyExecutor作为附加参数。

<U> CompletionStage<U>  thenApplyAsync​(Function<? super T,​? extends U> fn, Executor executor)

如果在那里传递forkjoin执行器,那么lambda中的并行流将使用传递的执行器而不是公共池。

百里修真
2023-03-14

正如在这个答案中所说,如果执行器碰巧是一个Fork/Join池,则有一个(未记录的)特性,即在一个工作线程中启动并行流将使用该执行器执行并行操作。

当您想要支持任意执行器实现时,事情就更复杂了。一个解决方案看起来像

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
       @Nonnull final REQUEST request, @Nonnull final Executor executor)
{
    CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
    return candidates.thenComposeAsync(
        response -> {
            List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
                .stream()
                .map(CompletableFuture::completedFuture)
                .map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
                .collect(Collectors.toList());
            return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenApplyAsync(x ->
                    list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                    executor);
        },
        executor);
}

第一个关键点是,我们必须在开始等待任何作业之前提交所有可能的异步作业,以实现执行器可能支持的最大并行性。因此,我们必须在第一步收集列表中的所有期货。

在第二步中,我们可以迭代列表并加入所有期货。如果执行器是一个Fork/Join池,并且未来还没有完成,它将检测到这一点并启动一个补偿线程以恢复配置的并行性。然而,对于任意的执行者,我们不能假设这样的特性。最值得注意的是,如果执行器是单线程执行器,这可能会导致死锁。

因此,该解决方案使用CompletableFuture.allOf执行迭代和加入所有期货的操作,只有当所有期货都已经完成时。因此,此解决方案永远不会阻塞执行程序的线程,使其与任何Execator实现兼容。

 类似资料:
  • 展望未来 Pixi能做很多事情,但是不能做全部的事情!如果你想用Pixi开始制作游戏或者复杂的交互型应用,你可能会需要一些有用的库: Bump: 一个为了游戏准备的完整的2D碰撞函数集. Tink: 拖放, 按钮, 一个通用的指针和其他有用的交互工具集。 Charm: 给Pixi精灵准备的简单易用的缓动动画效果。 Dust: 创建像爆炸,火焰和魔法等粒子效果。 Sprite Utilities:

  • 我有一个现有的接口链,我想作为一个反应器运行,而不是管理我自己的线程和队列 我如何链接该结果,使其调用与批量的?

  • 如何过滤该链(带和何处)函数的输出。底部输出为WWWithout chain with where chain。如果与where链接,则会导致返回错误。底部输出返回两个不同工作的人。我需要按艺术家筛选这份工作。每个模范人物、工作和媒体使用的关系都属于个人。所以有两个连接表。 表 控制器

  • 问题内容: 信封:Akka 2.1,scala版本2.10.M6,JDK 1.7,u5 现在是我的问题:我有: 现在在第一行中,我有一个Future对象的Future,有什么方法可以在不阻塞当前线程的情况下将其转换为Future? Akka有什么方法吗?据我检查,我还没有发现…第一次发帖....不好意思的格式和组织…:〜P 问题答案: 简短答案(英语):flatMap dat sh!t 较短的答案

  • 现在,我有三个函数:updateFieldFromCollection1()、

  • 好吧,我想问题已经在标题中完成了。没什么大不了的,但我只是想知道。我有一个返回正确值或错误代码枚举项的方法。例如这样的东西: 其中返回一个Future,而只是修改数据。 现在我已经直观地编写了< code>Future[_],因为返回值是灵活的。但是在查看其他库时,我看到了< code>Future[Any]的用法。当你在函数的返回中使用匹配用例来检查它是什么数据时,这似乎也是合乎逻辑的。 例如,