当前位置: 首页 > 知识库问答 >
问题:

用于聚合值并写入单个值的spring批处理

宗政唯
2023-03-14

我正在使用spring批处理,我需要实现以下内容

  1. 读取包含日期和金额等详细信息的csv文件
  2. 将同一日期的所有金额的总和合计
  3. 保留一个带有日期和总和的条目

我在过去使用过批处理,我想到了下面的方法。用2个步骤创建批处理。

步骤1:

  1. 读取器:使用FlatFileItemReader
  2. 遍历整个文件
  3. 处理器:用键作为日期,值作为金额填充映射。如果存在条目,则获取该值并将其添加到新值
  4. 编写器:没有操作编写器,因为我不想编写

第二步:

  1. 读取器:循环映射的值
  2. 编写器:保留值

我能够实现步骤1,在此填充映射。此映射已使用@jobscope声明

我被困在如何为step2创建阅读器上,它只需要阅读值列表。我尝试了ListItemReader,但无法从ListItemReader访问映射

请给出一个解决方案,或者如果你有更好的方法来解决这个问题

谢谢

共有1个答案

殳飞扬
2023-03-14

选项1:如果您的cvs已经按日期排序,您可以实现一个组读取器,它读取行,直到键值改变为止。之后,整个组可以作为一个项传递给处理器。

这样的群体阅读器可能如下所示:

  private SingleItemPeekableItemReader<I> reader;
  private ItemReader<I> peekReaderDelegate;

  @Override
  public void afterPropertiesSet() throws Exception {
    Assert.notNull(peekReaderDelegate, "The 'itemReader' may not be null");
    this.reader= new SingleItemPeekableItemReader<I>();
    this.reader.setDelegate(peekReaderDelegate);
  }

  @Override
  // GroupDTO is just a simple container. It is also possible to use
  // List<I> instead of GroupDTO<I>
  public GroupDTO<I> read() throws Exception {
    State state = State.NEW; // a simple enum with the states NEW, READING, and COMPLETE
    GroupDTO<I> group = null;
    I item = null;

    while (state != State.COMPLETE) {
      item = reader.read();

      switch (state) {
        case NEW: {
          if (item == null) {
            // end reached
            state = State.COMPLETE;
            break;
          }

          group = new GroupDTO<I>();
          group.addItem(item);
          state = State.READING;
          I nextItem = reader.peek();
          // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group
          if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) {
            state = State.COMPLETE;
          }
          break;
        }
        case READING: {
          group.addItem(item);

          // peek and check if there the peeked entry has a new date
          I nextItem = peekEntry();
          // isGroupBreak returns true, if 'item' and 'nextItem' do NOT belong to the same group
          if (nextItem == null || getGroupBreakStrategy.isGroupBreak(item, nextItem)) {
            state = State.COMPLETE;
          }
          break;
        }
        default: {
          throw new org.springframework.expression.ParseException(groupCounter, "ParsingError: Reader is in an invalid state");
        }
      }
    }

    return group;
  }

您需要一个SingleItemPeekableItemReader,以便预读下一个元素。这封封住了你的普通读者。

选项2:第一步如您所建议的,但只需为第二步编写一个任务。不需要使用reader-process-writer方法,而是可以使用一个简单的tasklet将映射的内容写入文件

选项3:如果您真的想在步骤2中使用读取器-处理器-写入器的方法,那么编写您自己的读取器来迭代您的映射。

类似于(我没有测试该代码):

public class MapReader implements ItemReader {

     private MapContainer container;
     private Iterator<Map.Entry<Date, Integer> mapIterator;

     @PostConstruct
     public void afterPropertiesSet() {
        Assert.notNull(container);
        iterator = container.getMap().entry().iterator;
     }

     public void setMapContainer(MapContainer container) {
         this.container = container;
     }

     public Map.Entry<Date, Integer> read() {
        if (iterator.hasNext()) {
           return iterator.next();
        }
        return null;
      }
}

@Component
public class MapContainer {
    private Map<Date, Integer> data = new Hashmap<>();

    public Map<Date, Integer> getMap() {
        return data;
    }

    // add modifier method as needed for step 1

}

因此,您为容器创建一个spring bean实例,将其注入到步骤2的处理器中,在那里填充它,还将其注入到上面的读取器中。

 类似资料:
  • 我正在阅读“Spring MVC,初学者指南”这本书。在一节中,它建议使用矩阵变量来传递一个高价格和低价格。在他们的示例中,声明指定了一个字符串列表作为第二个参数(见下文)。 但是,由于每个键只有一个值,我想我会尝试声明: 当我尝试访问与存储在中的特定键相关联的“value”(使用和时,我不断得到运行时的错误。 当我在debug中检查priceParams时,数据类型显示为,目标映射显示为。 我开

  • 我有一个商业案例,使用Spring batch将多个csv文件(每个文件大约1000个,包含1000条记录)合并成单个csv。 请帮助我提供方法和性能方面的指导和解决方案。 到目前为止,我已经尝试了两种方法, 方法1。 Tasklet chunk与multiResourceItemReader一起从目录中读取文件,FlatFileItemWriter作为项目编写器。 这里的问题是,它的处理速度非常

  • 我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢

  • 假设我有以下JSON结构,我希望按性别分组,并希望在同一字段中返回多个文档值: 现在我知道我可以做这样的事情,但是我需要把年龄和名字合并到一个字段中。

  • 我有一个Spring批处理应用程序,它从mysql数据库读取数据,并将其写入csv文件。 问题:当我看到csv中的数据时,我面临的问题是,一些大的数字显示为指数值。例如,对于这个数字“4491611100277480”,它显示为“4.49161E 15” 下面是我正在使用的代码片段

  • 我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目