当前位置: 首页 > 知识库问答 >
问题:

多行聚合ItemReader不按照Spring批处理样本中的建议工作

须鸿祯
2023-03-14

我正在尝试使用多行IteamReader来跟踪spring批处理示例https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline

我遇到如下编译错误-

我确信有一些与泛型相关的东西,因为它在寻找实现ItemReader的类,但是Aggregate ItemReader实现了ItemReader

public class AggregateItemReader<T> implements ItemReader<List<T>> {

你可以在这里找到我的代码-https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java

更新:

为了调用ItemStreamReader open()方法,我抑制了泛型并更新了AggregateItemReader,如下所示。

public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);

private ItemStreamReader<AggregateItem<T>> itemReader;

我注意到ItemWriter正在编写记录列表,而不是每行记录

[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]

当我尝试添加处理器时,它会抱怨处理器无法将列表转换为交易对象。

@Bean
public ItemProcessor<Trade, Trade> processor() {
    return new ItemProcessor<Trade, Trade>() {

        @Override
        public Trade process(Trade item) throws Exception {
            item.setProcessed(true);
            return item;
        }
    };
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
        AggregateItemReader reader, 
        ItemProcessor processor,
        FlatFileItemWriter writer,
        StepItemReadListener itemReadListener) {
    return stepBuilderFactory.get("multiLineStep")
            .chunk(1)
            .reader(reader)
            .writer(writer)
            .processor(processor)
            .build();
}

错误:

java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
    at my.demo.batch.BatchConfiguration$2.process(BatchConfiguration.java:1) ~[main/:na]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.html" target="_blank">springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]

帮助:

  1. 我们如何在不抑制泛型的情况下让它工作

共有1个答案

伏欣悦
2023-03-14

您需要在批次级别上处理的项目类型保持一致。根据您的步骤定义,它是交易。通过调用

问题是你的阅读器是ItemReader类型的

如果您想使用Aggregate ItemReader,您需要将其包装到一个自定义阅读器中,该阅读器作为适配器工作,实际上返回Trade项,而不是List

例如,自定义read方法可以如下所示:

public Trade read() throws Exception {
    if (queue.isEmpty()) {
        List<Trade> trades = aggregateItemReader.read();
        if (trades != null) {
            queue.addAll(trades);
        }
    }
    return queue.poll();
}

队列初始化为

private Deque<Trade> queue = new ArrayDeque<>();

 类似资料:
  • 我是Spring批处理的新手,我只想问如何从多行结果集中检索数据。我有以下场景: > 有两个不同的表说员工 使用时,我只能创建一个工资单子级,但该表可能有多个子级。请帮助...

  • 我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢

  • 我尝试了另一个解决方案,使用自定义的FlatItemReader,我重写了close()方法,在关闭后删除文件。这个解决方案发生了一个奇怪的事情,我的close方法被调用了两次,有些时候我可以删除文件,有些时候我不能删除它... 请参阅下面的日志: 我的close方法的代码: 我的BatchConfig.java: filedeletingTasklet.java

  • 知道为什么下面的代码会出现以下错误吗? java.lang.NullPointerException:org.springframework.batch.item.database.HibernateCursoritemReader.DoRead处为null(HibernateCursoritemReader.java:155)

  • Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?