当流源为时,我无法实现流处理的良好并行化Reader
。在四核CPU上运行以下代码,我观察到最初使用3个内核,然后突然下降到2个,然后是1个内核。总体CPU利用率徘徊在50%左右。
请注意示例的以下特征:
这意味着所有压力都在CPU上,I / O最小。该示例是用于自动并行化的“坐鸭”。
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
我的典型输出:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
为了进行比较,如果我使用经过稍微修改的变体,则首先收集到列表中,然后进行处理:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
我得到以下典型输出:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
有什么可以解释这种影响的?我如何解决该问题以获得充分利用?
请注意,我最初是在servlet输入流的阅读器上观察到的,因此它并非特定于FileReader
。
这是的答案,在的源代码中有详细说明Spliterators.IteratorSpliterator
,供以下人员使用BufferedReader#lines()
:
@Override
public Spliterator<T> trySplit() {
/*
* Split into arrays of arithmetically increasing batch
* sizes. This will only improve parallel performance if
* per-element Consumer actions are more costly than
* transferring them into an array. The use of an
* arithmetic progression in split sizes provides overhead
* vs parallelism bounds that do not particularly favor or
* penalize cases of lightweight vs heavyweight element
* operations, across combinations of #elements vs #cores,
* whether or not either are known. We generate
* O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
* potential speedup.
*/
Iterator<? extends T> i;
long s;
if ((i = it) == null) {
i = it = collection.iterator();
s = est = (long) collection.size();
}
else
s = est;
if (s > 1 && i.hasNext()) {
int n = batch + BATCH_UNIT;
if (n > s)
n = (int) s;
if (n > MAX_BATCH)
n = MAX_BATCH;
Object[] a = new Object[n];
int j = 0;
do { a[j] = i.next(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return null;
}
还值得注意的是常量:
static final int BATCH_UNIT = 1 << 10; // batch array size increment
static final int MAX_BATCH = 1 << 25; // max batch array size;
因此,在我使用6,000个元素的示例中,由于批次大小步长为1024,因此我只能得到三个批次。这可以准确地解释我的观察结果:最初使用了三个核心,然后减少为两个,然后在较小的批次完成时使用一个。同时,我尝试了一个包含60,000个元素的修改示例,然后获得了几乎100%的CPU利用率。
为了解决我的问题,我开发了下面的代码,该代码使我可以将任何现有流转换为一个流,然后Spliterator#trySplit
将其划分为指定大小的批处理。从我的问题中将其用于用例的最简单方法是这样的:
toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
在较低的级别上,下面的类是分隔器包装器,它可以更改包装的分隔器的trySplit
行为,并使其他方面保持不变。
import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
final int c = toWrap.characteristics();
this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
this.spliterator = toWrap;
this.est = est;
this.batchSize = batchSize;
}
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
this(toWrap, toWrap.estimateSize(), batchSize);
}
public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
}
@Override html" target="_blank">public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!spliterator.tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
策略分配即为策略设置应用范围,在应用范围内策略才会生效。 策略分配即为策略设置应用范围,在应用范围内策略才会生效。 说明 同一类型的策略在同一个应用范围中只能存在一条。 当在全局、域、域下的项目同时应用了不同的策略,在项目下,应用范围为该项目的策略生效。在域下其他项目,应用范围为该域的策略生效。在其他域下将应用全局范围的策略。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “系统配
自动化平台 默认 fullReset noReset iOS (包括XCUITest) 测试完成后关闭模拟器。不销毁模拟器。不从真机上卸载应用。 测试完成后卸载应用, 模拟器测试结束后销毁模拟器。 测试结束后不销毁或者关闭模拟器。启动测试运行在任意正在运行的模拟器或者插入的真机。 Android 测试结束后停止应用,清理应用数据,不卸载应用包。 测试结束后停止应用,清理应用数据,卸载应用包。 不停
在hazelcast映射配置中,如果我们将逐出策略设置为“无”,并使用最大空闲秒数、生存时间秒数,如下所示, 有人能解释一下,在这种情况下,最大尺寸是否有效?
试图将1000个观测(目前持续时间相同)安排在500个时隙中,因此只有一半适合。使用规划实体“观察”和可空的规划变量“timeslot”。表示为ConstraintStream的约束 null 调查结果 将501个观察值调度到500个时隙中,在16s后以0hard/-1medium终止。这是意料之中的。 调度1000个观测到500个时隙在10m后终止,-499hard/-1medium。这完全出乎
当我查看Spring Batch留档以并行执行步骤时,我只看到它通过XML的配置,如下所示。 我正在使用Spring批处理编写一个应用程序,我也使用了Spring Boot,我的所有配置都是使用注释完成的。是否有一个我可以使用Java配置来配置拆分步骤的方法?我查看了Spring Batch中Step interface的API文档,但它没有Split Step的默认实现。有没有办法使用现有的默认
https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html SIZED特征值表示在遍历或拆分之前从估计大小()返回的值表示有限大小,在没有结构源修改的情况下,表示完整遍历将遇到的元素数量的精确计数。 SUBSIZE Character 值表示 trySplit() 生成的所有拆分器都将同时具有 SIZE 和 SUBSIZ