当前位置: 首页 > 面试题库 >

Java 8并行流中的自定义线程池

萧安怡
2023-03-14
问题内容

是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。

假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。

如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。

请尝试以下示例。在单独的线程中执行一些CPU密集型任务。任务利用并行流。第一项任务已中断,因此每个步骤需要1秒钟(由线程睡眠模拟)。问题是其他线程被卡住并等待中断的任务完成。这是一个人为的示例,但请想象一个servlet应用程序,有人向共享的fork联接池提交了长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

问题答案:

实际上有一个技巧,如何在特定的fork-join池中执行并行操作。如果你将它作为一个任务在fork-join池中执行,它将停留在该位置并且不使用公共池。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

该技巧基于ForkJoinTask.fork,它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者如果不包含inForkJoinPool(),则使用ForkJoinPool.commonPool()”。



 类似资料:
  • 问题内容: 是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。 假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。 如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。 请尝试以下示例。在单独的线程中执行一些CPU

  • 我正在编写一个定制的ThreadPoolExecutor,具有以下额外功能:- > 如果有理想的线程,并且随着任务的到来,将该任务分配到队列中,而不是将其添加到队列中。 如果所有线程(最大池大小)都忙,则在新任务到来时,使用RejectionHandler的reject方法将它们添加到队列中 我已经重写了线程池执行程序的java 1.5版本的执行方法。 新守则如下:- 遗留代码如下所示:- 现在正

  • SOFARPC 支持自定义业务线程池。可以为指定服务设置一个独立的业务线程池,和 SOFARPC 自身的业务线程池是隔离的。多个服务可以共用一个独立的线程池。 SOFARPC 要求自定义线程池的类型必须是 com.alipay.sofa.rpc.server.UserThreadPool。 XML 方式 如果采用 XML 的方式发布服务,可以先设定一个 class 为 com.alipay.sof

  • 它们之间有什么相同和不同之处,看起来Java并行流中有RXJava中可用的一些元素,是吗?

  • 我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。 我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。 然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示: 我有一