当前位置: 首页 > 知识库问答 >
问题:

Java8批次处理作业[副本]

侯焱
2023-03-14

我有一个MyObject流,我想将它批量持久化到DB中(不是一个接一个,而是一次1000个)。所以我想做一个转变,就像

Stream<MyObject> ---> Stream<List<MyObject>>

其中每个列表都有一些固定大小的批处理大小。有没有办法用标准的Java 8 Stream API做到这一点?

共有1个答案

洪增
2023-03-14

编辑:下面的原始解决方案不起作用,因为java流不允许在同一流上调用跳过或限制多次。我最终进行了简单的处理,例如

    final AtomicInteger counter = new AtomicInteger();

    List<T> entityBatch = new ArrayList<>();

    entityStream.forEach(entity -> {
        if (counter.intValue() = batchSize) {
            processBatch(entityBatch);

            entityBatch.clear();
            counter.set(0);
        }

        entityBatch.add(entity);
        counter.incrementAndGet();
    });

    if (!entityBatch.isEmpty()) {
        processBatch(entityBatch);
    }

原始解决方案:看起来我找到了这样做的方法:

<T> Stream<List<T>> batchStream(Stream<T> stream, int batchSize) {
    return Stream.iterate(stream, s -> s.skip(batchSize)).map(s -> s.limit(batchSize).collect(toList()));
}
 类似资料:
  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml

  • 我有一个java应用程序,它对通过查询数据库中的表获得的批进行flink批处理,并将其输入kafka主题。我将如何定期安排这项工作。有flink调度程序吗?例如,我的java应用程序应该在后台持续运行,flink调度程序应该定期从数据库查询表,flink批处理它并将其输入kafka(flink批操作和输入Kafca已经在我的应用程序中完成了)。如果有人有这方面的建议,请帮忙。

  • 这似乎是一个愚蠢的问题。我正在尝试为Spring Batch作业存储库(Spring Batch 2.1.7)配置Oracle10g数据库,我能够使用在core中的org/spring框架/批/core/schema-oracle10g.sql可用的脚本创建表。我还将属性batch.data.source.init设置为false。 在干净的数据库上,我的批处理程序运行良好,成功地创建了所有批处理

  • 我正在尝试在GCP数据流中运行批处理作业。工作本身有时会占用大量内存。目前,工作一直在崩溃,因为我相信每个工作人员都在试图同时运行pcollection的多个元素。有没有办法防止每个工人一次运行多个元素?