当前位置: 首页 > 知识库问答 >
问题:

Spring批处理和多线程步骤

洪鸿博
2023-03-14

我目前正在处理一批数据,这些数据来自一个拥有数百万行的大型SQL数据库。

它在处理器中执行一些处理,包括通过带有连接的大型sql查询对从Reader检索到的行进行分组。

编写器将结果写入另一个表。

问题是此Batch存在性能问题,因为Sql选择查询需要大量时间并且步骤不会在多线程执行

因此,我希望在多标题中运行它们,但问题是,这些步骤通过计算具有相同类型的所有行的总数来对行进行分组。

因此,如果我将它放在多标题中,当每个分区将在不同的线程中处理时,我如何做到这一点,因为我知道有数百万行无法存储在上下文中,无法在步骤之后检索它们并进行分组。我也无法将它们保存在数据库中,因为它有数百万行。你知道我如何做到这一点吗?我希望我能很好地解释我的问题。提前感谢您的帮助

共有1个答案

顾乐心
2023-03-14

我有过像你类似的任务,不喜欢我们使用java 1.7和spring 3.x。我可以在xml中提供配置,所以也许你可以使用注释配置,我没有尝试过。

<batch:job id="dualAgeRestrictionJob">
    <-- use a listner if you need -->
    <batch:listeners>
        <batch:listener ref="dualAgeRestrictionJobListener" />
    </batch:listeners>
    <!-- master step, 10 threads (grid-size) -->
    <batch:step id="dualMasterStep">
        <partition step="dualSlaveStep"
            partitioner="arInputRangePartitioner">
            <handler grid-size="${AR_GRID_SIZE}" task-executor="taskExecutor" />
        </partition>
    </batch:step>   
</batch:job>
<-- here you define your reader processor and writer and the commit interval -->
<batch:step id="dualSlaveStep">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="arInputPagingItemReader"
            writer="arOutputWriter" processor="arInputItemProcessor"
            commit-interval="${AR_COMMIT_INTERVAL}" />
    </batch:tasklet>
</batch:step>
<!-- The partitioner -->
<bean id="arInputRangePartitioner" class="com.example.ArInputRangePartitioner">
    <property name="arInputDao" ref="arInputJDBCTemplate" />
    <property name="statsForMail" ref="statsForMail" />
</bean>
<bean id="taskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="${AR_CORE_POOL_SIZE}" />
    <property name="maxPoolSize" value="${AR_MAX_POOL_SIZE}" />
    <property name="allowCoreThreadTimeOut" value="${AR_ALLOW_CORE_THREAD_TIME_OUT}" />
</bean>
<bean id="transactionManager"
        class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="kvrDatasource" />
</bean>

分区器进行查询以计算行并为每个线程生成块:

public class ArInputRangePartitioner implements Partitioner {
    
    private static final Logger logger = LoggerFactory.getLogger(ArInputRangePartitioner.class);

    private ArInputDao arInputDao;
    
    private StatsForMail statsForMail;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        // You can make a query and then divede the from to for each thread
        Map<Integer,Integer> idMap = arInputDao.getOrderIdList();
        Integer countRow = idMap.size();
        statsForMail.setNumberOfRecords( countRow );  
        Integer range = countRow / gridSize;
        Integer remains = countRow % gridSize;
        int fromId = 1;
        int toId = range;
        for (int i = 1; i <= gridSize; i++) {
            ExecutionContext value = new ExecutionContext();
            if(i == gridSize) {
                toId += remains;
            }
            logger.info("\nStarting : Thread {}", i);
            logger.info("fromId : {}", idMap.get(fromId) );
            logger.info("toId : {}", idMap.get(toId) );
            value.putInt("fromId", idMap.get(fromId) );
            value.putInt("toId", idMap.get(toId) );
            value.putString("name", "Thread" + i);
            result.put("partition" + i, value);
            fromId = toId + 1;
            toId += range;
        }
        return result;
    }
    
    public ArInputDao getArInputDao() {
        return arInputDao;
    }

    public void setArInputDao(ArInputDao arInputDao) {
        this.arInputDao = arInputDao;
    }

    public StatsForMail getStatsForMail() {
        return statsForMail;
    }

    public void setStatsForMail(StatsForMail statsForMail) {
        this.statsForMail = statsForMail;
    }

}

这是读取器和写入器的配置:

<bean id="arInputPagingItemReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step" >
    <property name="dataSource" ref="kvrDatasource" />
    <property name="queryProvider">
        <bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean" >
            <property name="dataSource" ref="kvrDatasource" />
            <property name="selectClause" value="${AR_INPUT_PAGING_ITEM_READER_SELECT}" />
            <property name="fromClause" value="${AR_INPUT_PAGING_ITEM_READER_FROM}" />          <property name="whereClause" value="${AR_INPUT_PAGING_ITEM_READER_WHERE}" />
            <property name="sortKey" value="${AR_INPUT_PAGING_ITEM_READER_SORT}" />
        </bean>
    </property>
    <!-- Inject via the ExecutionContext in rangePartitioner -->
    <property name="parameterValues">
        <map>
            <entry key="fromId" value="#{stepExecutionContext[fromId]}" />
            <entry key="toId" value="#{stepExecutionContext[toId]}" />
        </map>
    </property>
    <property name="pageSize" value="${AR_PAGE_SIZE}" />
    <property name="rowMapper" ref="arOutInRowMapper" />
</bean>
<bean id="arOutputWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter"
        scope="step">
    <property name="dataSource" ref="kvrDatasource" />
    <property name="sql" value="${SQL_AR_OUTPUT_INSERT}"/>
    <property name="itemSqlParameterSourceProvider">
        <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
</bean>

也许有人知道如何用现代Spring批次/Spring靴来转换它

PS:不要使用很多线程,否则Spring批次会浪费很多时间来填充它自己的表。您必须进行一些基准测试才能了解正确的配置

 类似资料:
  • 我在批处理作业中使用多线程步骤来处理来自源数据库的记录并写入目标数据库。该步骤基于块,由JdbcpagingItemReader、Processor和JDBCBathItemWriter组成。我明白,如果在步骤处理期间发生任何异常,数据库事务将回滚整个块。我想了解一下Spring batch在内部是如何管理的?由于这是多线程步骤,因此不能保证处理器和写入器在块的同一线程中执行。块可能由不同的线程处

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 我有一个Spring Boot 1.5应用程序与Spring Batch 3.0.7和Java8。我最近收到了一些连接超时,当一个计划作业试图从5个线程开始时,而另一个长时间运行的批处理作业正在运行。似乎有15个线程合并的连接争用。我没有找到任何留档、博客或问题,似乎解决了Spring Batch中线程和池的相关性。 我使用HikariCP有3个连接,每个数据源配置为默认值(10个连接): bat

  • 以下是我的步骤: 它工作得很好。 现在,我需要处理bt“chuck专用线程”。 我添加了以下配置: 问题出现在这里,因为我收到了这样的信息: 或者 我相关的是: 我的数据源是: 和财产: 有什么想法吗?

  • 我想编写一个spring boot批处理应用程序,其中我有一个充满事件的数据库表。我想做的是有一个多线程的spring boot批处理应用程序,它将以这种方式工作: 我想有5个线程运行,每个线程将保留一个偏移量来跟踪它读取的事件,以便没有其他线程再次读取相同的事件。我想怎么做: 所以我希望能够在数据库表中为每个线程保留偏移量。有没有办法让Spring Boot环境以这种方式工作?