当前位置: 首页 > 知识库问答 >
问题:

Spring批处理-在多线程步骤中读一条记录写多条记录

方博
2023-03-14

我的计划是

  1. 使用多线程步骤,以便每个线程读取一条记录-在处理器中生成多条记录-将生成的记录写入单独的excel文件。
  2. 使用同步读取器从进程表中读取。
  3. 在处理器中,使用读取器中返回的记录查询DB(涉及多个联接)并形成一个复合对象。
  4. 自定义编写器将复合对象写入文件

就内存管理而言,上面的方法听起来不太好。
因为要写入的记录是在处理器中生成的(而不是从读取器那里获得的,读取器只是给出记录ID),所以只有在所有处理完成后,我才能提交。
如果是多个线程,那么在写入之前,内存中会有太多这样的大对象。

共有1个答案

孟福
2023-03-14

您可以基于process_id使用Partitioner(首先获取所有唯一的进程,并在您的自定义分区器中),并在reader中激发mutliple查询(尽管大多数DBA不同意这一点,并且可能会在db中使用tempspace)

这里是一个原型

<batch:step id="step1.slave">
    <batch:tasklet>
        <batch:chunk reader="jdbcCursorItemReader" writer="csvItemWriter" commit-interval="${app.max_number_of_rows}">
        </batch:chunk>
    </batch:tasklet>
</batch:step>

<bean id="jdbcCursorItemReader"
        class="org.springframework.batch.item.database.JdbcCursorItemReader">
        <property name="dataSource" ref="dataSource" />
        <property name="sql">
            <value>
                select *
                from blah blah
                where process_id=?
            </value>
        </property>
        <property name="preparedStatementSetter" ref="myPsSetter" />
</bean>

<bean id="myPsSetter" class="com.kp.swasthik.ps.BeamBalancePsSetter"
        scope="step">
        <property name="processId" value="#{stepExecutionContext['processId']}"></property>
</bean>

以及如何编写自定义分区器请参阅以下链接

 类似资料:
  • 我使用的是spring批处理,和通常使用的一样,我有读取器、处理器和写入器。 我有两个问题 1>Reader查询所有200条记录(表中记录总大小为200,我给出了pageSize=200),因此它得到所有200条记录,在处理器中,我们需要所有这些记录的列表,因为我们必须将每个记录与其他199条记录进行比较,以便将它们分组在不同的层中。因此我在想,如果我们能在处理步骤中得到那个列表,我就可以操纵它们

  • 我希望我的Spring批处理应用程序一次从数据库中读取50条记录,然后将这50条记录发送给处理器,然后发送给写入器。 有人可以告诉我如何做到这一点吗? 我尝试使用JdbcPagingItemReader并将pageSize设置为50,这样可以读取50条记录,但是rowMapper、处理器和编写器一次接收一条记录,而不是获得50条记录。 如何使处理器和写入器在dto中获得50条记录,而不是一次接收一

  • 我目前正在处理一批数据,这些数据来自一个拥有数百万行的大型SQL数据库。 它在处理器中执行一些处理,包括通过带有连接的大型sql查询对从Reader检索到的行进行分组。 编写器将结果写入另一个表。 问题是此Batch存在性能问题,因为Sql选择查询需要大量时间并且步骤不会在多线程中执行。 因此,我希望在多标题中运行它们,但问题是,这些步骤通过计算具有相同类型的所有行的总数来对行进行分组。 因此,如

  • 所以我只使用Jooq来构建查询,而不是执行查询,如下所示: 对象可以执行查询conn.asyncExecute(org.jooq.query query)。所以我的问题是,如何创建类型为org.jooq的批插入查询。查询?具体来说,给定一个列表 请注意,我知道其他问题询问如何使用Jooq进行批处理插入,但他们使用Jooq执行查询就像下面Jose Martinez的回答一样,而这里我只使用Jooq构

  • 我在批处理作业中使用多线程步骤来处理来自源数据库的记录并写入目标数据库。该步骤基于块,由JdbcpagingItemReader、Processor和JDBCBathItemWriter组成。我明白,如果在步骤处理期间发生任何异常,数据库事务将回滚整个块。我想了解一下Spring batch在内部是如何管理的?由于这是多线程步骤,因此不能保证处理器和写入器在块的同一线程中执行。块可能由不同的线程处

  • 我有一个spring批处理作业,它接收平面文件,处理记录,并将输出写入另一个平面文件。我分别使用了和作为读写器。然而,当我尝试实现多线程步骤时,我的工作无法正常工作。我在日志文件中收到以下警告 你能帮我实现多线程步骤吗?