Java 8中的默认“paralellStream()”使用公共的ForkJoinPool,如果在提交任务时公共池线程耗尽,这可能是一个延迟问题。然而,在许多情况下,有足够的CPU功率可用,并且任务足够短,因此这不是一个问题。如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但对于这个问题,我们假设这不是问题所在。
然而,用实际上不做任何CPU限制工作的I/O任务填充ForkJoinPool
是一种引入瓶颈的方法,即使有足够的CPU能力。我明白这一点。然而,这就是我们有ManagedBlocker
的目的。因此,如果我们有一个I/O任务,我们应该简单地允许ForkJoinPool
在ManagedBlocker
中管理它。这听起来非常容易。然而,令我惊讶的是,使用ManagedBlocker
对于简单的事情来说是相当复杂的API。毕竟我认为这是一个常见的问题。所以我只是构建了一个简单的实用程序方法,使ManagedBlocker
易于用于常见情况:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
现在,如果我想在paralell中下载几个网站的html代码,我可以这样做,而不会造成任何I/O问题:
java prettyprint-override">public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
我有点惊讶,没有像上面Java附带的BlockingTasks这样的类(或者我没有找到它?),但这并不难建造。
当我在谷歌上搜索“java 8并行流”时,我在前四个结果中看到一些文章声称,由于I/O问题,java中的Fork/Join很糟糕:
我对我的搜索词做了一些修改,虽然有很多人抱怨生活是多么可怕,但我发现没有人在谈论像上面这样的解决方案。因为我不觉得自己像马文(大脑像一个星球),Java 8已经有很长一段时间可用了,我怀疑我在那里提出的建议有很大的问题。
我做了一个小测试:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
我运行了一个,得到了以下结果:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
所以在我的8 CPU计算机上,ForkJoinPool自然选择8个线程,完成前8个任务,最后两个任务,这意味着这需要20秒,如果有其他任务排队,池可能仍然没有使用明显空闲的CPU(除了最后10秒内的6个内核)。
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
...而不是...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
...并得到了以下结果:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
在我看来,这是可行的,额外的线程被启动来补偿我模拟的“阻塞I/O操作”(睡眠)。时间被缩短到10秒,我想如果我排队更多的任务,这些任务仍然可以使用可用的CPU能力。
如果I/O操作包装在一个管理块中,这个解决方案或者通常在流中使用I/O有什么问题吗?
简而言之,是的,您的解决方案存在一些问题。它肯定会在并行流中使用阻塞代码进行改进,并且一些第三方库提供了类似的解决方案(例如,参见jOOλ库中的阻塞
类)。然而,该解决方案不会改变Stream API中使用的内部拆分策略。Stream API创建的子任务的数量由Abstractask
类中的预定义常量控制:
/**
* Default target factor of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
正如您所看到的,它比普通池并行性(默认情况下是CPU内核的数量)大四倍。真正的拆分算法有点棘手,但是大致上你不能有超过4x-8x的任务,即使它们都是阻塞的。
例如,如果您有8个CPU核,那么您的线程。sleep()test将在IntStream之前正常工作。范围(0,32)(如32=8*4)。然而,对于IntStream。范围(0,64)
您将有32个并行任务,每个任务处理两个输入数字,因此整个处理将花费20秒,而不是10秒。
问题内容: 我在这里看到许多帖子说不要使用该变量。我通常不这样做,但有时很方便。它出什么问题了? 问题答案: 但绝对没有错,采取从两个输入,并在组合方式。实际上,这就是您几乎总是想做的: 对于通常通过GET提交的纯幂等请求,您想要的数据量可能无法容纳在URL中,因此实际上已将其更改为POST请求。 对于真正生效的请求,您必须检查它是否由POST方法提交。但是,这样做的方法是显式检查,而不是依靠为G
我碰巧知道,在下面的表达式中,使用将导致无限流,将始终为0。我之所以困惑是因为我认为返回的值没有被使用,即便如此,它也不应该中断之后的增量。
问题内容: 我简直不敢相信我网站上正在发生的事情。当我添加此行时: 一切正常。如果我不这样做,CSS就会“混乱”,一切都会变得不同,布局也会变得“丑陋”。 这条线如何解决所有问题? 问题答案: 您正在将HTML与XHTML混合使用。 通常,声明用于区分HTMLish语言的版本(在这种情况下为HTML或XHTML)。 不同的标记语言将表现不同。我最喜欢的例子是。在浏览器中查看以下内容: XHTML
问题内容: 和PHP 和有什么不一样? 问题答案: 是先递增,后递增。 pre-increment:先增加变量,然后取消引用。 后递增:取消引用然后递增 “利用PHP允许您进行后递增($ i )和预递增( $ i)这一事实。只要您未编写类似$ j = $ i ++的内容,其含义是相同的。预增量几乎快了10%,这意味着您应该在有机会时从后增量切换到预增量,特别是在紧密循环中,尤其是如果您对微优化感到
本文向大家介绍你有在vue中使用过worker吗?用它解决什么问题?相关面试题,主要包含被问及你有在vue中使用过worker吗?用它解决什么问题?时的应答技巧和注意事项,需要的朋友参考一下 你有在vue中使用过worker吗?用它解决什么问题?
问题内容: 关于python 3.0中reduce()函数的更改以及如何删除它,网上似乎有很多热烈的讨论。我有点难以理解为什么会这样。我发现在各种情况下使用它是很合理的。如果蔑视仅仅是主观的,我无法想象会有这么多人关心它。 我想念什么?reduce()有什么问题? 问题答案: 正如Guido在Python 3000 帖子中的reduce()的命运中所说: 所以现在reduce()。实际上,这是我一