当前位置: 首页 > 知识库问答 >
问题:

在Java8 parallelStream()中使用I/O ManagedBloker有什么问题吗?

司马萧迟
2023-03-14

Java 8中的默认“paralellStream()”使用公共的ForkJoinPool,如果在提交任务时公共池线程耗尽,这可能是一个延迟问题。然而,在许多情况下,有足够的CPU功率可用,并且任务足够短,因此这不是一个问题。如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但对于这个问题,我们假设这不是问题所在。

然而,用实际上不做任何CPU限制工作的I/O任务填充ForkJoinPool是一种引入瓶颈的方法,即使有足够的CPU能力。我明白这一点。然而,这就是我们有ManagedBlocker的目的。因此,如果我们有一个I/O任务,我们应该简单地允许ForkJoinPoolManagedBlocker中管理它。这听起来非常容易。然而,令我惊讶的是,使用ManagedBlocker对于简单的事情来说是相当复杂的API。毕竟我认为这是一个常见的问题。所以我只是构建了一个简单的实用程序方法,使ManagedBlocker易于用于常见情况:

public class BlockingTasks {

    public static<T> T callInManagedBlock(final Supplier<T> supplier) {
        final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
        try {
            ForkJoinPool.managedBlock(managedBlock);
        } catch (InterruptedException e) {
            throw new Error(e);
        }
        return managedBlock.getResult();
    }

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
        private final Supplier<T> supplier;
        private T result;
        private boolean done = false;

        private SupplierManagedBlock(final Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean block() {
            result = supplier.get();
            done = true;
            return true;
        }

        @Override
        public boolean isReleasable() {
            return done;
        }

        public T getResult() {
            return result;
        }
    }
}

现在,如果我想在paralell中下载几个网站的html代码,我可以这样做,而不会造成任何I/O问题:

java prettyprint-override">public static void main(String[] args) {
    final List<String> pagesHtml = Stream
        .of("https://google.com", "https://stackoverflow.com", "...")
        .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
        .collect(Collectors.toList());
}

我有点惊讶,没有像上面Java附带的BlockingTasks这样的类(或者我没有找到它?),但这并不难建造。

当我在谷歌上搜索“java 8并行流”时,我在前四个结果中看到一些文章声称,由于I/O问题,java中的Fork/Join很糟糕:

  • https://dzone.com/articles/think-twice-using-java-8

我对我的搜索词做了一些修改,虽然有很多人抱怨生活是多么可怕,但我发现没有人在谈论像上面这样的解决方案。因为我不觉得自己像马文(大脑像一个星球),Java 8已经有很长一段时间可用了,我怀疑我在那里提出的建议有很大的问题。

我做了一个小测试:

public static void main(String[] args) {
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
    IntStream.range(0, 10).parallel().forEach((x) -> sleep());
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}

public static void sleep() {
    try {
        System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        throw new Error(e);
    }
}

我运行了一个,得到了以下结果:

18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End

所以在我的8 CPU计算机上,ForkJoinPool自然选择8个线程,完成前8个任务,最后两个任务,这意味着这需要20秒,如果有其他任务排队,池可能仍然没有使用明显空闲的CPU(除了最后10秒内的6个内核)。

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));

...而不是...

IntStream.range(0, 10).parallel().forEach((x) -> sleep());

...并得到了以下结果:

18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End

在我看来,这是可行的,额外的线程被启动来补偿我模拟的“阻塞I/O操作”(睡眠)。时间被缩短到10秒,我想如果我排队更多的任务,这些任务仍然可以使用可用的CPU能力。

如果I/O操作包装在一个管理块中,这个解决方案或者通常在流中使用I/O有什么问题吗?

共有1个答案

容远
2023-03-14

简而言之,是的,您的解决方案存在一些问题。它肯定会在并行流中使用阻塞代码进行改进,并且一些第三方库提供了类似的解决方案(例如,参见jOOλ库中的阻塞类)。然而,该解决方案不会改变Stream API中使用的内部拆分策略。Stream API创建的子任务的数量由Abstractask类中的预定义常量控制:

/**
 * Default target factor of leaf tasks for parallel decomposition.
 * To allow load balancing, we over-partition, currently to approximately
 * four tasks per processor, which enables others to help out
 * if leaf tasks are uneven or some processors are otherwise busy.
 */
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

正如您所看到的,它比普通池并行性(默认情况下是CPU内核的数量)大四倍。真正的拆分算法有点棘手,但是大致上你不能有超过4x-8x的任务,即使它们都是阻塞的。

例如,如果您有8个CPU核,那么您的线程。sleep()test将在IntStream之前正常工作。范围(0,32)(如32=8*4)。然而,对于IntStream。范围(0,64)您将有32个并行任务,每个任务处理两个输入数字,因此整个处理将花费20秒,而不是10秒。

 类似资料:
  • 问题内容: 我在这里看到许多帖子说不要使用该变量。我通常不这样做,但有时很方便。它出什么问题了? 问题答案: 但绝对没有错,采取从两个输入,并在组合方式。实际上,这就是您几乎总是想做的: 对于通常通过GET提交的纯幂等请求,您想要的数据量可能无法容纳在URL中,因此实际上已将其更改为POST请求。 对于真正生效的请求,您必须检查它是否由POST方法提交。但是,这样做的方法是显式检查,而不是依靠为G

  • 我碰巧知道,在下面的表达式中,使用将导致无限流,将始终为0。我之所以困惑是因为我认为返回的值没有被使用,即便如此,它也不应该中断之后的增量。

  • 问题内容: 我简直不敢相信我网站上正在发生的事情。当我添加此行时: 一切正常。如果我不这样做,CSS就会“混乱”,一切都会变得不同,布局也会变得“丑陋”。 这条线如何解决所有问题? 问题答案: 您正在将HTML与XHTML混合使用。 通常,声明用于区分HTMLish语言的版本(在这种情况下为HTML或XHTML)。 不同的标记语言将表现不同。我最喜欢的例子是。在浏览器中查看以下内容: XHTML

  • 问题内容: 和PHP 和有什么不一样? 问题答案: 是先递增,后递增。 pre-increment:先增加变量,然后取消引用。 后递增:取消引用然后递增 “利用PHP允许您进行后递增($ i )和预递增( $ i)这一事实。只要您未编写类似$ j = $ i ++的内容,其含义是相同的。预增量几乎快了10%,这意味着您应该在有机会时从后增量切换到预增量,特别是在紧密循环中,尤其是如果您对微优化感到

  • 本文向大家介绍你有在vue中使用过worker吗?用它解决什么问题?相关面试题,主要包含被问及你有在vue中使用过worker吗?用它解决什么问题?时的应答技巧和注意事项,需要的朋友参考一下 你有在vue中使用过worker吗?用它解决什么问题?

  • 问题内容: 关于python 3.0中reduce()函数的更改以及如何删除它,网上似乎有很多热烈的讨论。我有点难以理解为什么会这样。我发现在各种情况下使用它是很合理的。如果蔑视仅仅是主观的,我无法想象会有这么多人关心它。 我想念什么?reduce()有什么问题? 问题答案: 正如Guido在Python 3000 帖子中的reduce()的命运中所说: 所以现在reduce()。实际上,这是我一