当前位置: 首页 > 知识库问答 >
问题:

Spring批处理分区:连续(不并行)运行从属程序

凌和悦
2023-03-14

我使用spring batch来执行一些calcul,在reader中,我必须获得一个大数据,以便在处理器/写入器中处理,这个过程需要大量的(RAM)。因此,我尝试使用分区器拆分步骤,如下所示:

<batch:step id="MyStep.master" >
    <partition step="MyStep" partitioner="MyPartitioner">
        <handler grid-size="1" task-executor="TaskExecutor" />
    </partition>
</batch:step>

<batch:step id="MyStep" >
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="MyReader" processor="MyProcessor"
            writer="MyWriter" commit-interval="1000" skip-limit="1000">
            <batch:skippable-exception-classes>
                <batch:include class="...FunctionalException" />
            </batch:skippable-exception-classes>
        </batch:chunk>
    </batch:tasklet>
</batch:step>

<bean id="MyPartitioner" class="...MyPartitioner" scope="step"/>

<bean id="TaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >

<bean name="MyReader"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="sql">
        <value>
            <![CDATA[
                SELECT...                   
            ]]>
        </value>
    </property>
    <property name="rowMapper" ref="MyRowMapper" />
</bean>

<bean id="MyRowMapper" class="...MyRowMapper" />

<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
    <property name="driverClass" value="org.postgresql.Driver"/>
    <property name="jdbcUrl" value="jdbc:postgresql://${database.host}/${database.name}"/>
    <property name="user" value="${database.user}"/>
    <property name="password" value="${database.password}"/>        
    <property name="acquireIncrement" value="1" />
    <property name="autoCommitOnClose" value="true" />
    <property name="minPoolSize" value="${min.pool.size}" /> <!-- min.pool.size=5  -->
    <property name="maxPoolSize" value="${max.pool.size}" /> <!-- max.pool.size=15  -->
</bean>

但是分区也徒劳地占用了大量内存,因为步骤(从机)是并行执行的,我想做的是拆分步骤并连续执行线程(而不是并行执行)以减少内存使用(RAM),这可能吗?

共有1个答案

邬弘化
2023-03-14

这个问题有点老了,所以我不确定这是否有帮助,可能你自己解决了。

如果行执行顺序没有问题,那么解决方案是查询分区器bean中的db,然后将所有信息传递给每个分区,以便将表/秒(start_key,end_key)分成几个部分。这将大大减少ram的使用。

一些警告:

    null
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">

    <!-- JOB -->
    <batch:job id="printPdf" job-repository="jobRepository"
        restartable="false">

        <batch:step id="MyStep">
            <batch:partition step="MyStep.template"
                partitioner="myPartitioner" handler="partitionHandler">
            </batch:partition>
        </batch:step>

    </batch:job>

    <!-- Partitioner -->
    <bean id="myPartitioner" class="foo.MyPartitioner"
        scope="step">
        <property name="jdbcTemplate" ref="myJdbcTemplate" />
        <property name="sql"
            value="Select ...." />
        <property name="rowMap">
            <bean
                class="foo.MyPartitionHandlerRowMapper" />
        </property>
        <property name="preparedStatementSetter">
            <bean
                class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
                <property name="parameters">
                    <list>
                        <value>#{jobParameters['param1']}</value>
                    </list>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="partitionHandler" scope="step"
        class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
        <property name="taskExecutor" ref="customTaskExecutor" />
        <property name="gridSize" value="#{jobParameters['gridSize']}" />
        <property name="step" ref="MyStep.template" />
    </bean>

    <bean id="customTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="8" />
        <property name="maxPoolSize" value="8" />
        <property name="waitForTasksToCompleteOnShutdown" value="true" />
        <property name="awaitTerminationSeconds" value="120" />
    </bean>

    <batch:step id="MyStep.tempate">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk commit-interval="2500" reader="myReader"
                processor="myProcessor" writer="myWriter" skip-limit="2500">
            <batch:skippable-exception-classes>
                <batch:include class="...FunctionalException" />
            </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>

    <!-- Beans -->

    <!-- Processors -->
    <bean id="myProcessor" class="foo.MyProcessor"
        scope="step">
    </bean>

    <bean id="classLoaderVerifier"
        class="it.addvalue.pkjwd.services.genbean.GenericStockKeysForNoDuplicate" />

    <!-- Readers -->
    <bean id="myReader"
        class="org.springframework.batch.item.database.JdbcCursorItemReader"
        scope="step">
        <property name="dataSource" ref="myDataSouce" />
        <property name="sql"
            value="select ... from ... where ID >= ? and ID <= ?" />
        <property name="rowMapper">
            <bean class="foo.MyReaderPartitionedRowMapper" />
        </property>
        <property name="preparedStatementSetter">
            <bean
                class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
                <property name="parameters">
                    <list>
                        <value>#{stepExecutionContext['START_ID']}</value>
                        <value>#{stepExecutionContext['END_ID']}</value>
                    </list>
                </property>
            </bean>
        </property>
    </bean>

    <!-- Writers -->
    <bean id="myWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="assertUpdates" value="false" />
        <property name="itemPreparedStatementSetter">
            <bean class="foo.MyWriterStatementSetters" />
        </property>
        <property name="sql"
            value="insert ..." />
        <property name="dataSource" ref="myDataSouce" />
    </bean>

</beans>

您的分区器Bean如下所示:

package foo;

import foo.model.MyTable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;

public class MyPartitioner implements Partitioner
{
    private JdbcTemplate                     jdbcTemplate;

    private RowMapper<foo.model.MyTable> rowMap;

    private String                           sql;

    private PreparedStatementSetter          preparedStatementSetter;

    public JdbcTemplate getJdbcTemplate()
    {
        return jdbcTemplate;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate)
    {
        this.jdbcTemplate = jdbcTemplate;
    }

    public RowMapper<foo.model.MyTable> getRowMap()
    {
        return rowMap;
    }

    public void setRowMap(RowMapper<PkjwdPolizzePartition> rowMap)
    {
        this.rowMap = rowMap;
    }

    public String getSql()
    {
        return sql;
    }

    public void setSql(String sql)
    {
        this.sql = sql;
    }

    public PreparedStatementSetter getPreparedStatementSetter()
    {
        return preparedStatementSetter;
    }

    public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter)
    {
        this.preparedStatementSetter = preparedStatementSetter;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
        Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

        try
        {
            List<PkjwdPolizzePartition> lstMyRows = jdbcTemplate.query(sql, preparedStatementSetter ,rowMap);

            if ( lstMyRows.size() > 0 )
            {
                int total = lstMyRows.size();
                int rowsPerPartition = total / gridSize;
                int leftovers = total % gridSize;
                total = lstMyRows.size() - 1;

                int startPos = 0;
                int endPos = rowsPerPartition - 1;

                int i = 0;

                while (endPos <= (total))
                {

                    ExecutionContext context = new ExecutionContext();

                    if ( endPos + leftovers == total )
                    {
                        endPos = total;
                    }
                    else if ( endPos >= (total) )
                    {
                        endPos = total;
                    }

                    context.put("START_ID", lstMyRows.get(startPos).getId());
                    context.put("END_ID", lstMyRows.get(endPos).getId());



                    map.put("PART_" + StringUtils.leftPad("" + i, ("" + gridSize).length(), '0'), context);

                    i++;
                    startPos = endPos + 1;
                    endPos = endPos + rowsPerPartition;
                }
            }

        }
        catch ( Exception e )
        {
            e.printStackTrace();
        }

        return map;
    }

}
 类似资料:
  • 我们有一个用例,需要从一些分页的API读取数据,然后写入一些下游的Kafka主题。 我们已经能够通过Spring批处理集成远程分区来实现解决方案,其中管理器通过创建包含页码和偏移量以读取数据的执行上下文来处理任务的分区。管理器创建此执行上下文并将它们放在MessagingChannel上(我可以使用RabbitMQ和Kafka主题,以提供解决方案者为准)。工作人员(超过1个)从MessagingC

  • 我试图配置我的第一个多线程作业。我们有大约200,000条记录的主目录,我们需要处理。我想将文件分解为10个文件并处理它们。拆分文件tasklet工作正常 主步骤在我的配置中运行,但从步骤不运行。下面是我的配置。 分割者: MultiResourceItemReader: FlatFileItemWriter: 作业配置: 从属步骤配置: 请告知我做错了什么。我没有看到处理器urlFileItem

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“

  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(

  • 我在我的项目中集成了Spring Batch,我在运行JobLauncher时遇到了问题。 在我的类JobLauncher我有这个: 对于配置,我使用XML配置: 配置批处理。xml: 在作业配置中。我有: 当我在类JobLauncher中调试时,它会在jobLuancher中停止。运行,我也不例外,似乎SpringBatch无法识别reader和whriter!!有什么建议吗?