当前位置: 首页 > 面试题库 >

Reader#lines()由于其拆分器中不可配置的批量大小策略而严重并行化

谯灿
2023-03-14
问题内容

当流源为时,我无法实现流处理的良好并行化Reader。在四核CPU上运行以下代码,我观察到最初使用3个内核,然后突然下降到2个,然后是1个内核。总体CPU利用率徘徊在50%左右。

请注意示例的以下特征:

  • 只有6,000行;
  • 每条线大约需要20毫秒的处理时间;
  • 整个过程大约需要一分钟。

这意味着所有压力都在CPU上,I / O最小。该示例是用于自动并行化的“坐鸭”。

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),
                 realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
    System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createInput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

我的典型输出:

          Cores: 4
       CPU time: 110.23 s
      Real time: 53.60 s
CPU utilization: 51.41%

为了进行比较,如果我使用经过稍微修改的变体,则首先收集到列表中,然后进行处理:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

我得到以下典型输出:

          Cores: 4
       CPU time: 138.43 s
      Real time: 35.00 s
CPU utilization: 98.87%

有什么可以解释这种影响的?我如何解决该问题以获得充分利用?

请注意,我最初是在servlet输入流的阅读器上观察到的,因此它并非特定于FileReader


问题答案:

这是的答案,在的源代码中有详细说明Spliterators.IteratorSpliterator,供以下人员使用BufferedReader#lines()

    @Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a, 0, j, characteristics);
        }
        return null;
    }

还值得注意的是常量:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

因此,在我使用6,000个元素的示例中,由于批次大小步长为1024,因此我只能得到三个批次。这可以准确地解释我的观察结果:最初使用了三个核心,然后减少为两个,然后在较小的批次完成时使用一个。同时,我尝试了一个包含60,000个元素的修改示例,然后获得了几乎100%的CPU利用率。

为了解决我的问题,我开发了下面的代码,该代码使我可以将任何现有流转换为一个流,然后Spliterator#trySplit将其划分为指定大小的批处理。从我的问题中将其用于用例的最简单方法是这样的:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

在较低的级别上,下面的类是分隔器包装器,它可以更改包装的分隔器的trySplit行为,并使其他方面保持不变。

import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
  }

  @Override html" target="_blank">public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}


 类似资料:
  • 策略分配即为策略设置应用范围,在应用范围内策略才会生效。 策略分配即为策略设置应用范围,在应用范围内策略才会生效。 说明 同一类型的策略在同一个应用范围中只能存在一条。 当在全局、域、域下的项目同时应用了不同的策略,在项目下,应用范围为该项目的策略生效。在域下其他项目,应用范围为该域的策略生效。在其他域下将应用全局范围的策略。 入口:在云管平台单击左上角导航菜单,在弹出的左侧菜单栏中单击 “系统配

  • 自动化平台 默认 fullReset noReset iOS (包括XCUITest) 测试完成后关闭模拟器。不销毁模拟器。不从真机上卸载应用。 测试完成后卸载应用, 模拟器测试结束后销毁模拟器。 测试结束后不销毁或者关闭模拟器。启动测试运行在任意正在运行的模拟器或者插入的真机。 Android 测试结束后停止应用,清理应用数据,不卸载应用包。 测试结束后停止应用,清理应用数据,卸载应用包。 不停

  • 在hazelcast映射配置中,如果我们将逐出策略设置为“无”,并使用最大空闲秒数、生存时间秒数,如下所示, 有人能解释一下,在这种情况下,最大尺寸是否有效?

  • 试图将1000个观测(目前持续时间相同)安排在500个时隙中,因此只有一半适合。使用规划实体“观察”和可空的规划变量“timeslot”。表示为ConstraintStream的约束 null 调查结果 将501个观察值调度到500个时隙中,在16s后以0hard/-1medium终止。这是意料之中的。 调度1000个观测到500个时隙在10m后终止,-499hard/-1medium。这完全出乎

  • 当我查看Spring Batch留档以并行执行步骤时,我只看到它通过XML的配置,如下所示。 我正在使用Spring批处理编写一个应用程序,我也使用了Spring Boot,我的所有配置都是使用注释完成的。是否有一个我可以使用Java配置来配置拆分步骤的方法?我查看了Spring Batch中Step interface的API文档,但它没有Split Step的默认实现。有没有办法使用现有的默认

  • https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html SIZED特征值表示在遍历或拆分之前从估计大小()返回的值表示有限大小,在没有结构源修改的情况下,表示完整遍历将遇到的元素数量的精确计数。 SUBSIZE Character 值表示 trySplit() 生成的所有拆分器都将同时具有 SIZE 和 SUBSIZ