当前位置: 首页 > 知识库问答 >
问题:

我应该如何使用。tasklet()/.chunk()成功地完成作业?

蒲德曜
2023-03-14

我使用Spring Batch将表从源数据库克隆到目标数据库。使用joblauncher和传递参数从服务层手动启动作业。

一切都很好,但是使用当前配置(下面),在步骤描述中使用.chunk(10)时,我只克隆了10行,并且由:java.sql.sqlexception:Result set已经关闭异常引起。

@Configuration
@EnableBatchProcessing
public class DatasetProcessingContext {

    private static final String OVERRIDEN_BY_JOB_PARAMETER = null;
    private static final String DATASET_PROCESSING_STEP = "datasetProcessingStep";
    private static final String DATASET_PROCESSING_JOB = "datasetProcessingJob";

    public static final String SUBSYSTEM = "subsystem";
    public static final String SQL = "sql";
    public static final String SOURCE_DATASOURCE = "sourceDatasource";
    public static final String INSERT_QUERY = "insertQuery";
    public static final String TARGET_DATASOURCE = "targetDatasource";

    @Autowired
    @Qualifier(DEV_DATA_SOURCE)
    private DataSource devDataSource;

    //set of datasources

    @Autowired
    private PlatformTransactionManager transactionManager;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Autowired
    private Map<String, TableMessageDataRowMapper> tableMessageDataRowMappers;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Autowired
    private Map<String, TableMessageDataPreparedStatementSetter> messageDataPreparedStatementSetters;

    @Autowired
    private JobBuilderFactory jobsFactory;

    @Autowired
    private StepBuilderFactory stepsFactory;

    @Bean
    public JobRepository jobRepository() throws Exception {
        return new MapJobRepositoryFactoryBean(transactionManager).getObject();
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry());
        return postProcessor;
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository());
        return jobLauncher;
    }

    @Bean
    public static StepScope stepScope() {
        return new StepScope();
    }

    @Bean
    @SuppressWarnings("unchecked")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemStreamReader jdbcReader(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
                                       @Value("#{jobParameters['" + SQL + "']}") String sql,
                                       @Value("#{jobParameters['" + SOURCE_DATASOURCE + "']}") String sourceDatasource) {

        JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader();
        jdbcCursorItemReader.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(sourceDatasource)));
        jdbcCursorItemReader.setSql(sql);
        jdbcCursorItemReader.setRowMapper((RowMapper) tableMessageDataRowMappers
                .get(subsystem + TABLE_MESSAGE_DATA_ROW_MAPPER));

        return jdbcCursorItemReader;
    }

    @Bean
    @SuppressWarnings("unchecked")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemWriter jdbcWriter(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
                                 @Value("#{jobParameters['" + INSERT_QUERY + "']}") String insertQuery,
                                 @Value("#{jobParameters['" + TARGET_DATASOURCE + "']}") String targetDatasource) {

        JdbcBatchItemWriter jdbcWriter = new JdbcBatchItemWriter();
        jdbcWriter.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(targetDatasource)));
        jdbcWriter.setSql(insertQuery);
        jdbcWriter.setItemPreparedStatementSetter(messageDataPreparedStatementSetters
                .get(subsystem + TABLE_MESSAGE_DATA_PREPARED_STATEMENT_SETTER));

        return jdbcWriter;
    }

    @Bean
    @SuppressWarnings("unchecked")
    public Step datasetProcessingStep() {

        return stepsFactory.get(DATASET_PROCESSING_STEP)
                // should I create Tasklet or chunk with some CompletionPolicy?
                .chunk(10)
                .reader(jdbcReader(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
                .writer(jdbcWriter(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public Job datasetProcessingJob() {

        return jobsFactory.get(DATASET_PROCESSING_JOB).start(datasetProcessingStep()).build();
    }

共有1个答案

钱和平
2023-03-14

在步骤描述中使用.chunk(new DefaultResultCompletionPolicy())适合我的情况。如果null-result-than ResultSet结束,此策略将从iscomplete(RepeatContext context,RepeatStatus result)返回true

 类似资料:
  • 我使用的是Symfony的作曲家,我发现在我安装或更新任何新的包后,通过运行它会将东西复制到相应的文件夹,但在完成这个过程之前会抛出一些错误。 加载具有包信息更新依赖项的composer存储库-安装KNPLAB/knp组件(开发主机163308e)克隆163308ed3442e7e9ec4a45ff912664e366954c82 安装knplabs/knp-page inator-bundle(

  • 在C#中使用时,一般的规则是避免,因为这几乎是一个错误,如果方法没有发送返回值,则应该使用。有道理。奇怪的是,在本周早些时候,我为我编写的一些方法编写了一些单元测试,并注意到NUnit建议将测试标记为或返回。然后我试了一下,果然奏效了。这似乎真的很奇怪,因为nunit框架如何能够运行该方法并等待所有异步操作完成?如果它返回Task,它可以等待任务,然后做它需要做的事情,但是如果它返回void,它如

  • 我有这个,它将某个类解析为另一个类: 我的意思是,我可以使用类似这样的东西来完成这件事吗?

  • 问题内容: 我正在尝试关注nodetuts.com的第3集。另外,我使用的是节点的最新(不稳定)版本- node.exe,版本0.5.2。这是我的代码,几乎整天以来,我都在不断碰到这个错误。只是窗户上的东西吗? 谢谢! 问题答案: 0.5.x在Windows上有错误。你可以做 我相信0.6将解决这些问题。:)

  • 问题内容: 我正在尝试实现自动补全功能,但是找不到在Swift中可用的示例。下面,我打算转换Ray Wenderlich的自动完成教程 和2010年的示例代码。最后,代码进行了编译,但是没有显示包含可能完成的表格,而且我没有经验来了解为什么它未被隐藏shouldChangeCharactersInRange。 问题答案: 用下面的内容替换您的函数内容。希望对您有帮助。

  • 问题内容: 我是使用USB SSH插件在Linux服务器上运行命令的人,我使用SSH启动了一个脚本,该脚本花费了超过1个小时的时间才能运行,我不想等待那么多时间,因此我将30秒钟的时间设置为Exec超时。但是它显示错误ERROR:发布时发生异常,异常消息[Exec超时或在30,000 ms后被中断],并且不稳定。我不想使我的构建不稳定,因为我使用了詹金斯文本查找器来稳定该构建。在那个詹金斯文本查找