当前位置: 首页 > 知识库问答 >
问题:

Java8并行流中的自定义线程池

干弘深
2023-03-14

是否可以为Java8并行流指定自定义线程池?我到处都找不到它。

如果我不能为不同的模块使用不同的线程池,这就意味着我不能在大多数真实世界的情况下安全地使用并行流。

请尝试以下示例。有些CPU密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被打破,因此每一步需要1秒(通过线程Hibernate模拟)。问题是其他线程会被卡住,等待中断的任务完成。这是一个虚构的示例,但假设一个servlet应用程序和一个人将一个长期运行的任务提交给共享的fork连接池。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

共有1个答案

邢思淼
2023-03-14

实际上,在特定的fork-join池中执行并行操作有一个诀窍。如果您在fork-join池中作为任务执行它,则它将保留在那里,而不使用通用的。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

此技巧基于ForkJoinAsk.fork,它指定:“安排在当前任务运行的池中异步执行此任务(如果适用),或者使用ForkJoinPool.CommonPool(),如果不是InforkJoinPool()

 类似资料:
  • 问题内容: 是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。 假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。 如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。 请尝试以下示例。在单独的线程中执行一些CPU

  • 我正在编写一个定制的ThreadPoolExecutor,具有以下额外功能:- > 如果有理想的线程,并且随着任务的到来,将该任务分配到队列中,而不是将其添加到队列中。 如果所有线程(最大池大小)都忙,则在新任务到来时,使用RejectionHandler的reject方法将它们添加到队列中 我已经重写了线程池执行程序的java 1.5版本的执行方法。 新守则如下:- 遗留代码如下所示:- 现在正

  • SOFARPC 支持自定义业务线程池。可以为指定服务设置一个独立的业务线程池,和 SOFARPC 自身的业务线程池是隔离的。多个服务可以共用一个独立的线程池。 SOFARPC 要求自定义线程池的类型必须是 com.alipay.sofa.rpc.server.UserThreadPool。 XML 方式 如果采用 XML 的方式发布服务,可以先设定一个 class 为 com.alipay.sof

  • 它们之间有什么相同和不同之处,看起来Java并行流中有RXJava中可用的一些元素,是吗?

  • 我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。 我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。 然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示: 我有一