当前位置: 首页 > 知识库问答 >
问题:

强制Java stream提前执行管道的一部分,以便将阻塞任务提交到线程池

徐阳炎
2023-03-14

我有一个想要处理的对象列表,而Java8流API看起来是最干净、最易读的方法。

但是我需要在这些对象上执行的一些操作包括阻塞IO(比如读取数据库)--所以我希望将这些操作提交到一个包含几十个线程的线程池中。

一开始我想做一些类似的事情:

myObjectList
    .stream()
    .filter(wrapPredicate(obj -> threadPoolExecutor.submit(
            () -> longQuery(obj)          // returns boolean
    ).get())                              // wait for future & unwrap boolean
    .map(filtered -> threadPoolExecutor.submit(
            () -> anotherQuery(filtered)  // returns Optional
    ))
    .map(wrapFunction(Future::get))
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(toList());

WrapPredicateWrapFunction仅用于重新抛出已检查的异常。

但是,显然,对future.get()调用将阻塞流的线程,直到对给定对象的查询完成为止,并且流在此之前不会进行。因此一次只处理一个对象,线程池没有意义。

我可以使用一个并行流,但我需要希望缺省的forkjoinpool就足够了。或者只是增加“java.util.concurrent.forkJoinPool.common.parallelism”,但我不想为了该流而更改整个应用程序的设置。我可以在自定义的forkjoinpool中创建流,但我发现它不能保证这种级别的并行性。

所以我以这样的方式结束,只是为了保证在等待futures完成之前,所有需要的任务都提交到线程池:

myObjectList
    .stream()
    .map(obj -> Pair.of(obj, threadPoolExecutor.submit(
                    () -> longQuery(obj)             // returns boolean
        ))
    )
    .collect(toList()).stream()                      // terminate stream to actually submit tasks to the pool
    .filter(wrapPredicate(p -> p.getRight().get()))  // wait & unwrap future after all tasks are submitted
    .map(Pair::getLeft)
    .map(filtered -> threadPoolExecutor.submit(
            () -> anotherQuery(filtered)             // returns Optional
    ))
    .collect(toList()).stream()                      // terminate stream to actually submit tasks to the pool
    .map(wrapFunction(Future::get))                  // wait & unwrap futures after all submitted
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(toList());

有什么明显更好的方法来实现这一点吗?

一种更优雅的方法告诉流“现在就对流中的每个对象执行流水线步骤”,然后保持.Collect(toList()).stream()以外的处理,以及一种更好的方法来过滤Future的效果,而不是将其打包到Apache Commonspair中,以便稍后对pair::getright进行过滤?或者是一种完全不同的解决问题的方法?

共有1个答案

严扬
2023-03-14

您可以通过使用

myObjectList.stream()
    .map(obj -> threadPoolExecutor.submit(
                    () -> longQuery(obj)? anotherQuery(obj).orElse(null): null))
    .collect(toList()).stream()
    .map(wrapFunction(Future::get))
    .filter(Objects::nonNull)
    .collect(toList());

有一点是,如果您稍后将anotherQuery提交给同一个执行器,那么并发性将不会有任何改进。因此,您可以在longquery返回true之后直接执行它。此时,obj仍在作用域内,因此您可以将其用于AnotherQuery

通过提取optional的结果,使用null作为失败的表示,我们可以得到缺省结果的相同表示,不管是因为longquery返回了false还是因为anotherquery返回了空的optional。因此,在提取future的结果之后,我们所要做的就是.filter(Objects::nonnull)

在获得实际结果之前,您必须首先提交作业(收集future)的逻辑不会改变。反正也没办法绕过。其他方法或框架所能提供的所有便利性都是隐藏这些对象的实际临时存储。

 类似资料:
  • 要并行或异步运行一些东西,我可以使用ExecutorService:

  • 当我试图提交这个gradle.build文件时,它失败了,并显示以下消息: 当我查看日志时,我看到: 而且,令人惊讶的是,在日志的末尾出现了关于丢失文件的奇怪消息:

  • 我们有一个场景,提交给ThreadPoolExecitor的任务长时间运行。当线程池启动时,我们以核心池大小=5、最大池大小=20和队列大小为10启动它。在我们的应用程序中,大约有10个任务被提交。大多数时候,这些任务运行几分钟/小时,然后完成。但是有一种情况,所有5个任务都挂在I/O上。因此,我的核心池大小达到了最大值,但我的ThreadpoolExector队列未满。所以额外的5个任务从未有机

  • 我有一个包含数千行的数据文件。我正在读取它们并将它们保存在数据库中。我想以50行的批次多线程处理这个过程。当我在文件中读取时,10行被提交给ExecutorService。 我可以在一段时间内循环执行以下操作,直到行结束。。。。 但是,如果处理10行需要时间,我不想将整个文件读入内存。我只想在其中一个线程返回之前提交5个线程,然后提交下一个线程。 假设一个线程需要20秒来保存10条记录,我不希望被

  • 我一直在寻找这样一种情况的解决方案:我有一个调用项的哈希集,并且我要将这个集提交给执行器进行并行执行。现在我想只要任何提交的任务完成,我应该能够分配一个新的Callable到Executor。 我尝试了这段代码,但是如果我使用Executor.Invoke,那么Executor将等待直到所有任务完成,如果我使用Executor.Submit,那么任务将按顺序完成。如有任何帮助,我们将不胜感激。

  • 我有一个应用程序,允许用户批量向图像添加水印。该应用程序将只使用一个线程,并且一次只能添加一个水印。 我希望用户能够更改一次运行的水印任务[线程]的数量:可能在设置中为[1-5],并且我不能使用固定的ThreadPool,因为它具有固定的池大小。 我研究了如何使用线程池执行器(ThreadPoolExecutor)私有静态线程池执行器(ThreadPoolExecutor)=(ThreadPool