当前位置: 首页 > 知识库问答 >
问题:

带有批处理的Java 8流

庄元龙
2023-03-14

我有一个包含项目列表的大文件。

我想创建一批项目,用这个批次做一个HTTP请求(所有的项目都需要作为HTTP请求中的参数)。我可以用for循环很容易地做到这一点,但是作为Java8爱好者,我想尝试用Java8的Stream框架来编写这个(并获得延迟处理的好处)。

例子:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

我想做一些事情沿着< code>lazyFileStream.group(500)线。映射(processBatch)。collect(toList())

最好的方法是什么?

共有3个答案

邢俊悟
2023-03-14

纯Java 8解决方案:

我们可以创建一个自定义收集器来优雅地执行此操作,该收集器采用批大小Consumer 来处理每个批次:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

然后选择性地创建一个助手实用程序类:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

示例用法:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

我也在GitHub上发布了我的代码,如果有人想看的话:

链接到Github

蒋岳
2023-03-14

纯Java-8实现也是可能的:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

请注意,与JOOl不同,它可以很好地并行工作(前提是您的<code>数据<code>是一个随机访问列表)。

宫亦
2023-03-14

为了完整起见,这是一个番石榴解决方案

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

在问题中,集合是可用的,所以不需要流,可以写成,

Iterables.partition(data, batchSize).forEach(this::process);
 类似资料:
  • 我有一个带有两个数据库的Spring Batch应用程序:一个SQLDB用于Spring Batch元数据,另一个是存储所有业务数据的MongoDB。关系DB仍然使用。但是我不认为Mongo写入是在带有回滚的活动事务中完成的。以下是上官方Spring Batch留档的摘录: ItemWriter实现,使用Spring数据的MongoOperations实现写入MongoDB存储。由于MongoDB

  • 我有一个MyObject流,我想将它批量持久化到DB中(不是一个接一个,而是一次1000个)。所以我想做一个转变,就像 其中每个列表都有一些固定大小的批处理大小。有没有办法用标准的Java 8 Stream API做到这一点?

  • 我在没有ItemWriter的情况下定义了我的tasklet,如下所示: 我得到了这个错误: 配置问题:

  • 批处理配置具有spring作业,只有一个步骤 1)读取器-从csv文件读取。处理器对文件应用一些规则。Drools请运行schema-postgresql.sql来设置数据库 WRITER使用SPRING DATA JPA写入DB Writer将此称为PersonDaoImpl:

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我试图用jdk17运行一个Java8批处理,该项目是用JRE1.8.0_192构建的,我在VM中添加了以下选项: --add-modules=all-system--add-opens=java.base/java.lang=all-unnamed--add-opens=java.base/java.math=all-unnamed--add-opens=java.base/java.net=al