当前位置: 首页 > 知识库问答 >
问题:

有没有好的方法从Java8流中提取数据块?

冯枫
2023-03-14

我是一个ETL过程,我正在从Spring数据存储库中检索大量实体。然后使用并行流将实体映射到不同的实体。我可以使用一个使用者将这些新实体逐个存储在另一个存储库中,或者将它们收集到一个列表中,并将其存储在单个批量操作中。第一种方法代价很高,而后者可能会超出可用的内存

共有1个答案

施默
2023-03-14

我使用块处理大容量操作的方法是使用分区分割器包装器和另一个包装器,该包装器覆盖默认的拆分策略(批大小以1024为增量的等差数列),以简单的固定批拆分。这样使用:

Stream<OriginalType> existingStream = ...;
Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1);
partitioned.forEach(chunk -> ... process the chunk ...);

以下是完整的代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>>
{
  private final Spliterator<E> spliterator;
  private final int partitionSize;

  public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {
    super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);
    if (partitionSize <= 0) throw new IllegalArgumentException(
        "Partition size must be positive, but was " + partitionSize);
    this.spliterator = toWrap;
    this.partitionSize = partitionSize;
  }

  public static <E> Stream<List<E>> partition(Stream<E> in, int size) {
    return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);
  }

  public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {
    return StreamSupport.stream(
        new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);
  }

  @Override public boolean tryAdvance(Consumer<? super List<E>> action) {
    final ArrayList<E> partition = new ArrayList<>(partitionSize);
    while (spliterator.tryAdvance(partition::add) 
           && partition.size() < partitionSize);
    if (partition.isEmpty()) return false;
    action.accept(partition);
    return true;
  }

  @Override public long estimateSize() {
    final long est = spliterator.estimateSize();
    return est == Long.MAX_VALUE? est
         : est / partitionSize + (est % partitionSize > 0? 1 : 0);
  }
}
import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
    characteristics |= ORDERED;
    if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
    this.characteristics = characteristics;
    this.batchSize = batchSize;
    this.est = est;
  }
  public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
    this(characteristics, batchSize, Long.MAX_VALUE);
  }
  public FixedBatchSpliteratorBase(int characteristics) {
    this(characteristics, 64, Long.MAX_VALUE);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}
import static java.util.stream.StreamSupport.stream;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
  private final Spliterator<T> spliterator;

  public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {
    super(toWrap.characteristics(), batchSize, est);
    this.spliterator = toWrap;
  }
  public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, batchSize, toWrap.estimateSize());
  }
  public FixedBatchSpliterator(Spliterator<T> toWrap) {
    this(toWrap, 64, toWrap.estimateSize());
  }

  public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);
  }

  public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
    return new FixedBatchSpliterator<>(toWrap, batchSize);
  }

  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
}
 类似资料:
  • 我有一个数据框,比如说一些投资数据。我需要根据某些条件(比如说,U类型)从这个数据帧中提取数据。有许多可用的基金类型,我只需要提取与特定基金类型匹配的数据。 funding_type有风险、种子、天使、股权等价值。我只需要数据匹配资金类型比如种子和天使 我试着跟着 这里MF1是我的数据帧。这将提供与种子基金类型相关的所有数据 我需要的条件有点像 MF1[MF1['funding_round_typ

  • 我有许多坐标(大约20000),我需要从许多NetCDF文件中提取数据,每个文件大约有30000个时间步(未来的气候场景)。使用这里的解决方案效率不高,原因是每个i,j将“dsloc”转换为“dataframe”所花费的时间(请看下面的代码)。**可以从这里下载NetCDF文件示例** 结果是: 这意味着每个i、j需要大约9秒来处理。考虑到大量的坐标和netcdf文件以及大量的时间步长,我想知道是

  • 问题内容: 是否有一个很好的方法来Map 获取和忽略案件? 问题答案: TreeMap扩展了Map并支持自定义比较器。 字符串提供默认的不区分大小写的比较器。 所以: 比较器不考虑区域设置。在其JavaDoc中阅读有关它的更多信息。

  • 问题内容: 我试图找到一种方法来查找数据库中的表的名称(如果存在)。我发现从sqlite cli我可以使用: 然后对于字段: 这显然在python中不起作用。有没有办法用python做到这一点,还是我应该只使用sqlite命令行? 问题答案: 您应该能够从表中访问表名称。 列名不能直接访问。获取它们的最简单方法是查询表并从查询结果中获取列名。

  • 这是图像,我想填充这个矩形或正方形的边缘,这样我就可以使用轮廓裁剪它。到目前为止,我所做的是,我使用canny边缘检测器查找边缘,然后使用按位_或我将这个矩形填充一点,但不是完全填充。如何填充这个矩形,或者有没有直接裁剪的方法?

  • 这是我拥有的当前代码: 我有一个CSV文件,看起来像: 但是当我运行这段代码时,它会自动跳过第一行,因为它认为这是一个标题。我不想跳过第一行并打印出所有内容。那要怎么做?谢谢。