我正在尝试使用多行IteamReader来跟踪spring批处理示例https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline
我遇到如下编译错误-
我确信有一些与泛型相关的东西,因为它在寻找实现ItemReader的类,但是Aggregate ItemReader实现了ItemReader
public class AggregateItemReader<T> implements ItemReader<List<T>> {
你可以在这里找到我的代码-https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java
更新:
为了调用ItemStreamReader open()方法,我抑制了泛型并更新了AggregateItemReader,如下所示。
public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);
private ItemStreamReader<AggregateItem<T>> itemReader;
我注意到ItemWriter正在编写记录列表,而不是每行记录
[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]
当我尝试添加处理器时,它会抱怨处理器无法将列表转换为交易对象。
@Bean
public ItemProcessor<Trade, Trade> processor() {
return new ItemProcessor<Trade, Trade>() {
@Override
public Trade process(Trade item) throws Exception {
item.setProcessed(true);
return item;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
AggregateItemReader reader,
ItemProcessor processor,
FlatFileItemWriter writer,
StepItemReadListener itemReadListener) {
return stepBuilderFactory.get("multiLineStep")
.chunk(1)
.reader(reader)
.writer(writer)
.processor(processor)
.build();
}
错误:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
at my.demo.batch.BatchConfiguration$2.process(BatchConfiguration.java:1) ~[main/:na]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.html" target="_blank">springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]
帮助:
您需要在批次级别上处理的项目类型保持一致。根据您的步骤定义,它是交易
。通过调用
问题是你的阅读器是
ItemReader类型的
如果您想使用
Aggregate ItemReader
,您需要将其包装到一个自定义阅读器中,该阅读器作为适配器工作,实际上返回Trade
项,而不是List
例如,自定义
read
方法可以如下所示:
public Trade read() throws Exception {
if (queue.isEmpty()) {
List<Trade> trades = aggregateItemReader.read();
if (trades != null) {
queue.addAll(trades);
}
}
return queue.poll();
}
将
队列
初始化为
private Deque<Trade> queue = new ArrayDeque<>();
我是Spring批处理的新手,我只想问如何从多行结果集中检索数据。我有以下场景: > 有两个不同的表说员工 使用时,我只能创建一个工资单子级,但该表可能有多个子级。请帮助...
我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢
我尝试了另一个解决方案,使用自定义的FlatItemReader,我重写了close()方法,在关闭后删除文件。这个解决方案发生了一个奇怪的事情,我的close方法被调用了两次,有些时候我可以删除文件,有些时候我不能删除它... 请参阅下面的日志: 我的close方法的代码: 我的BatchConfig.java: filedeletingTasklet.java
知道为什么下面的代码会出现以下错误吗? java.lang.NullPointerException:org.springframework.batch.item.database.HibernateCursoritemReader.DoRead处为null(HibernateCursoritemReader.java:155)
Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml
当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?