我有一个包含项目列表的大文件。
我想创建一批项目,用这个批次做一个HTTP请求(所有的项目都需要作为HTTP请求中的参数)。我可以用for
循环很容易地做到这一点,但是作为Java8爱好者,我想尝试用Java8的Stream框架来编写这个(并获得延迟处理的好处)。
例子:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
我想做一些事情沿着< code>lazyFileStream.group(500)线。映射(processBatch)。collect(toList())
最好的方法是什么?
纯Java 8解决方案:
我们可以创建一个自定义收集器来优雅地执行此操作,该收集器采用批大小
和 Consumer
来处理每个批次:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.Objects.requireNonNull;
/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
private final int batchSize;
private final Consumer<List<T>> batchProcessor;
/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);
this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}
public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}
public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
然后选择性地创建一个助手实用程序类:
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;
public class StreamUtils {
/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
示例用法:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
我也在GitHub上发布了我的代码,如果有人想看的话:
链接到Github
纯Java-8实现也是可能的:
int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
请注意,与JOOl不同,它可以很好地并行工作(前提是您的<code>数据<code>是一个随机访问列表)。
为了完整起见,这是一个番石榴解决方案。
Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
在问题中,集合是可用的,所以不需要流,可以写成,
Iterables.partition(data, batchSize).forEach(this::process);
我有一个带有两个数据库的Spring Batch应用程序:一个SQLDB用于Spring Batch元数据,另一个是存储所有业务数据的MongoDB。关系DB仍然使用。但是我不认为Mongo写入是在带有回滚的活动事务中完成的。以下是上官方Spring Batch留档的摘录: ItemWriter实现,使用Spring数据的MongoOperations实现写入MongoDB存储。由于MongoDB
我有一个MyObject流,我想将它批量持久化到DB中(不是一个接一个,而是一次1000个)。所以我想做一个转变,就像 其中每个列表都有一些固定大小的批处理大小。有没有办法用标准的Java 8 Stream API做到这一点?
我在没有ItemWriter的情况下定义了我的tasklet,如下所示: 我得到了这个错误: 配置问题:
批处理配置具有spring作业,只有一个步骤 1)读取器-从csv文件读取。处理器对文件应用一些规则。Drools请运行schema-postgresql.sql来设置数据库 WRITER使用SPRING DATA JPA写入DB Writer将此称为PersonDaoImpl:
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
我试图用jdk17运行一个Java8批处理,该项目是用JRE1.8.0_192构建的,我在VM中添加了以下选项: --add-modules=all-system--add-opens=java.base/java.lang=all-unnamed--add-opens=java.base/java.math=all-unnamed--add-opens=java.base/java.net=al