当前位置: 首页 > 知识库问答 >
问题:

如何(全局)替换Java并行流的公共线程池后端?

谷泳
2023-03-14

我希望全局替换Java并行流默认使用的公共线程池,例如

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});

我知道,通过将这样的指令提交到专用线程池(参见Java8并行流中的自定义线程池),可以使用专用的ForkJoinPool。这里的问题是

  • 是否可以用其他实现(例如executors.NewFixedThreadPool(10)
  • 来替换常见的ForkJoinPool
  • 是否可以通过某些全局设置(例如,某些JVM属性)来实现?

备注:我喜欢替换f/j池的原因是,它似乎有一个bug,使它不能用于嵌套的并行循环。

嵌套并行循环性能较差,可能导致死锁,请参见http://christian-fries.de/blog/files/2014-nested-java-8-Paralle-foreach.html

例如:以下代码导致死锁:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});

我的问题是如何取代FJP。如果您想讨论嵌套的并行循环,您可以检查嵌套的Java8并行forEach循环Perforeach Poor。这种行为在意料之中吗?.

共有1个答案

柏夕
2023-03-14

我认为这不是stream API的使用方式。您似乎(错误地)使用它来简单地执行并行任务(专注于任务,而不是数据),而不是进行并行流处理(专注于流中的数据)。您的代码在某种程度上违反了流的一些主要原则。(我写的是“莫名其妙”,因为它不是真的被禁止,而是被劝阻):避免状态和副作用。

除此之外(或者可能是因为副作用),您在外部循环中使用了重度同步,这是无害的!

虽然文档中没有提到,但并行流在内部使用了通用的forkjoinpool。不管这是不是缺少文件,我们必须接受这一事实。forkjointask的JavaDoc声明:

定义和使用可能阻塞的ForkJoinTasks是可能的,但要做到这一点需要三个进一步的考虑:(1)如果任何其他任务都依赖于一个阻塞外部同步或I/O的任务,则完成很少的任务。从不联接的事件样式异步任务(例如,那些子类化CountedCompleter的任务)经常属于这一类。(2)最小化资源影响,任务要小;理想情况下只执行(可能的)阻塞操作。(3)除非使用ForkJoinPool.ManagedBlocker API,或者已知可能被阻塞的任务数少于池的ForkJoinPool.GetParallelism级别,否则池无法保证将有足够的线程来确保进度或良好的性能。

同样,您似乎在使用流来替换简单的for-loop和executor服务。

  • 如果只想并行执行n任务,请使用executionservice
  • 如果有一个任务创建子任务的更复杂示例,请考虑改用forkjoinpool(与forkjoinasks)。(它确保线程数量恒定,不会因为等待其他线程完成的任务太多而出现死锁的危险,因为等待的任务不会阻塞它们正在执行的线程)。
  • 如果要(并行)处理数据,请考虑使用流API。
  • 不能“安装”自定义公用池。它是在内部私有静态代码中创建的。
  • 但是您可以使用某些系统属性(请参见ForkJoinPool的JavaDoc)
  • 来影响公共池的并行性、线程工厂和异常处理程序

不要混淆executionserviceforkjoinpool。他们(通常)不能互相替代!

 类似资料:
  • 问题内容: 是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。 假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。 如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。 请尝试以下示例。在单独的线程中执行一些CPU

  • 是否可以为Java8并行流指定自定义线程池?我到处都找不到它。 如果我不能为不同的模块使用不同的线程池,这就意味着我不能在大多数真实世界的情况下安全地使用并行流。 请尝试以下示例。有些CPU密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被打破,因此每一步需要1秒(通过线程Hibernate模拟)。问题是其他线程会被卡住,等待中断的任务完成。这是一个虚构的示例,但假设一个servlet

  • 我注意到一些web框架(如Play Framework)允许您配置多个不同大小的线程池(线程池中的线程数)。假设我们在单核的单机中运行这个游戏。拥有多个线程池不会有很大的开销吗?

  • 问题内容: 请参阅下面的简单示例,该示例计算列表中每个单词的出现次数: 最后是。 但是我的数据流很大,我想并行化作业,所以我写: 但是我注意到这很简单,所以我想知道是否需要显式请求并发映射以确保线程安全: 非并行收集器可以安全地与并行流一起使用吗?从并行流中收集时,我是否应该仅使用并发版本? 问题答案: 非并行收集器可以安全地与并行流一起使用吗?从并行流中收集时,我是否应该仅使用并发版本? 在并行

  • ConsumptionExecutor: 然而,我想使用Akka流/Akka Actor,在这里我不需要给出固定的线程池大小,Akka负责所有的事情。我对Akka和流媒体和演员的概念很陌生。有人能给我任何线索,以示例代码的形式,以适合我的用例?提前道谢!

  • 我想同时向一个webservice发送webservice调用。最多应有20个并行请求等待webservice响应。任何其他请求都应该等待它们完成。 如果一个用户向我发送一个请求,这通常会导致向目标服务器发送5个并行请求。因此,我一次最多可以服务20/5=4个用户。其他人将不得不等待,这很好。或者被高负荷拒绝。 问题:我应该使用哪个线程池,以及如何配置它? 我读了上面的内容如下:主池可以向网络服务