当前位置: 首页 > 知识库问答 >
问题:

SimpleAsyncTaskExecutor的Spring批处理未保存到DB

许曦
2023-03-14

我有一个简单的Spring启动应用程序,它带有一个enpoint,通过JobLauncher bean中配置的SimpleAsynctaskeExecutor异步调用Spring批处理作业。

Spring批处理作业异步启动,工作正常,但没有任何内容保存到数据库中。

如果我删除SimpleAsyncTaskExecutor,数据就会保存。

这是我的批处理配置程序。我用SimpleAsyncTaskExecutor配置JobLauncher。如果我删除simplejoblancher这行。setTaskExecutor(新的SimpleAsyncTaskExecutor());//(1) 数据被保存。

@Component
public class CustomBatchConfiguration implements BatchConfigurer {

private static final Log LOGGER = LogFactory.getLog(CustomBatchConfiguration.class);

@Autowired
private BatchProperties properties;

@Autowired
private DataSource dataSource;

@Autowired
private EntityManagerFactory entityManagerFactory;

private PlatformTransactionManager transactionManager;

private JobRepository jobRepository;

private JobLauncher jobLauncher;

private JobExplorer jobExplorer;

/**
 * Registers {@link JobRepository} bean.
 */
@Override
public JobRepository getJobRepository() {
    return this.jobRepository;
}

/**
 * Registers {@link PlatformTransactionManager} bean.
 */
@Override
public PlatformTransactionManager getTransactionManager() {
    return this.transactionManager;
}

/**
 * Registers {@link JobLauncher} bean.
 */
@Override
public JobLauncher getJobLauncher() {
    return this.jobLauncher;
}

/**
 * Registers {@link JobExplorer} bean. This bean is actually created in
 * {@link BatchConfig}.
 */
@Override
public JobExplorer getJobExplorer() throws Exception {
    return this.jobExplorer;
}

/**
 * Initializes Spring Batch components.
 */
@PostConstruct
public void initialize() {
    try {
        this.transactionManager = createTransactionManager();
        this.jobRepository = createJobRepository();
        this.jobLauncher = createJobLauncher();
        this.jobExplorer = createJobExplorer();
    } catch (Exception ex) {
        throw new IllegalStateException("Unable to initialize Spring Batch", ex);
    }
}

private JobExplorer createJobExplorer() throws Exception {
    JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
    jobExplorerFactoryBean.setDataSource(this.dataSource);
    String tablePrefix = this.properties.getTablePrefix();

    if (StringUtils.hasText(tablePrefix)) {
        jobExplorerFactoryBean.setTablePrefix(tablePrefix);
    }

    jobExplorerFactoryBean.afterPropertiesSet();
    return jobExplorerFactoryBean.getObject();
}

private JobLauncher createJobLauncher() throws Exception {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();

    simpleJobLauncher.setJobRepository(getJobRepository());
    simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // (1)
    simpleJobLauncher.afterPropertiesSet();

    return simpleJobLauncher;
}

private JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();

    jobRepositoryFactoryBean.setDatabaseType("db2");
    jobRepositoryFactoryBean.setDataSource(this.dataSource);

    if (this.entityManagerFactory != null) {
        LOGGER.warn("JPA does not support custom isolation levels, so locks may not be taken when launching Jobs");
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
    }

    String tablePrefix = this.properties.getTablePrefix();
    if (StringUtils.hasText(tablePrefix)) {
        jobRepositoryFactoryBean.setTablePrefix(tablePrefix);
    }

    jobRepositoryFactoryBean.setTransactionManager(getTransactionManager());
    jobRepositoryFactoryBean.afterPropertiesSet();

    return jobRepositoryFactoryBean.getObject();
}

private PlatformTransactionManager createTransactionManager() {
    return new DataSourceTransactionManager(dataSource);
}

}

这是我的配置。

@Autowired(required = true)
private MyItemReader myItemReader;

@Autowired(required = true)
private MyItemProcessor myItemProcessor;

@Autowired(required = true)
private MyItemWriter myItemWriter;


@Bean
public Step myStep(TaskExecutor taskExecutor) { 

    return stepBuilderFactory.get("myStepName")
            .<SomeWrapper, SomeWrapper>chunk(
                    1)
            .reader(myItemReader)
            .processor(myItemProcessor).writer(myItemWriter)

            .build();
}

@Bean(name = "myJob")
public Job myJob(Step myStep) {
    return jobBuilderFactory.get("myJobName").incrementer(new RunIdIncrementer())
            .flow(myStep).end().build();
}

我错过了什么?

提前谢谢

共有1个答案

黄和怡
2023-03-14

我做了以下来解决我的问题:

>

private PlatformTransactionManager createTransactionManager() {        
    return new DataSourceTransactionManager(dataSource);
}

private PlatformTransactionManager createTransactionManager() {
    if (this.entityManagerFactory != null) {
        return new JpaTransactionManager(this.entityManagerFactory);
    }

    return new DataSourceTransactionManager(this.dataSource);
}

我使用了JpaTransactionManager的Spring。

我改变JobLauncher任务执行从

simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());

ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
threadPoolExecutor.setMaxPoolSize(springBatchJobThreadPollMaxSize);
threadPoolExecutor.afterPropertiesSet();

TaskExecutor taskExecutor = new DelegatingSecurityContextAsyncTaskExecutor(threadPoolExecutor);

创建一个更健壮的线程池,并在它们之间共享Spring Security上下文。

 类似资料:
  • 我有一个使用JPA的Spring Boot应用程序,它有一个PostgreSQL数据库。我使用的是Spring Batch。场景是我正在读取一个文件并将数据写入PostgreSQL数据库。当程序在数据库中创建Spring Batch使用的元数据表时,它与PostgreSQL一起工作。但我需要的是Spring Boot不要创建元数据表,并通过Spring Batch使用内存中基于映射的作业存储库。我

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我有一个Spring Boot应用程序,通过从kafka收集数据,我正在做很多插入。我希望使用 saveAll 批处理插入来提高性能。但是很少有数据会被认为是重复的,每当在我的代码中捕获DataIntegrityViolationException时,我都会更新它们。使用批处理插入,有没有办法为每个重复的数据捕获此异常,并使用 do 更新代码进行处理?

  • 我有一个compositeItemWriter,它有2个代理编写器:1。HeaderWriter将一些字段从我的对象写入头表2。DetailWriter将文件写入详细表。 context.xml:

  • 我正在使用Spring Batch和JPA处理一个批处理作业并执行更新。我正在使用默认的存储库实现。 并且我正在使用一个repository.save将修改后的对象保存在处理器中。而且,我没有在处理器或编写器中指定任何@Transactional注释。 下面是我的步骤,读取器和写入器配置:另外,我的config类是用EnableBatchProcessing注释的 在writer中,这就是我使用的

  • 我使用的是由java.sql引起的相同代码:na]。SQLSyntaxErrorException:ORA-00942:表或视图不存在-Spring批处理,无法将记录保存到数据库中。 我已经创建了表 LifeCycleStatusWriter.java 作家 错误: 配置详细信息 数据库配置