当前位置: 首页 > 知识库问答 >
问题:

Spring批处理:多线程步进错误配置

蒙胤
2023-03-14

以下是我的步骤:

@Bean
public Step autors(
    ItemReader<Autor> autorItemReader,
    AutorMappingItemProcessor processor,
    AutorPipeliningItemWriter unitatPipeliningWriter
) {
    return this.stepBuilderFactory
        .get("autors")
        .<Autor, AutorDenormalized>chunk(100)
        .reader(autorItemReader)
        .processor(processor)
        .writer(unitatPipeliningWriter)
        .build();
}

它工作得很好。

现在,我需要处理bt“chuck专用线程”。

我添加了以下配置:

@Bean
public Step autors(
    ItemReader<Autor> autorItemReader,
    AutorMappingItemProcessor processor,
    AutorPipeliningItemWriter unitatPipeliningWriter,
    TaskExecutor taskExecutor
) {
    return this.stepBuilderFactory
        .get("autors")
        .<Autor, AutorDenormalized>chunk(100)
        .reader(autorItemReader)
        .processor(processor)
        .writer(unitatPipeliningWriter)
        .taskExecutor(taskExecutor)
        .throttleLimit(4)
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

问题出现在这里,因为我收到了这样的信息:

HikariPool-2 - Connection ConnectionID:1 ClientConnectionId: 8bee2b6e-d88e-4831-b9e5-163b52dca86c marked as broken because of SQLSTATE(08S01), ErrorCode(0)

com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.

或者

HikariPool-2 - Connection ConnectionID:1 ClientConnectionId: 8bee2b6e-d88e-4831-b9e5-163b52dca86c marked as broken because of SQLSTATE(08S01), ErrorCode(0)

com.microsoft.sqlserver.jdbc.SQLServerException: The TDS protocol stream is not valid.

我相关的ItemReader是:

@Bean
public ItemReader<Autor> autorReader() {
    String sql = "select * from ...";

    JdbcCursorItemReader<Autor> jdbcCursorItemReader = new JdbcCursorItemReader<>();
    jdbcCursorItemReader.setDataSource(this.dataSource);
    jdbcCursorItemReader.setSql(sql);
    jdbcCursorItemReader.setVerifyCursorPosition(false);
    jdbcCursorItemReader.setRowMapper(this.autorMapper);

    return jdbcCursorItemReader;
}

我的数据源是:

@Bean
@JobDataSource
@ConfigurationProperties(prefix = "spring.job-datasource")
public DataSource jobDataSource() {
    return DataSourceBuilder.create().build();
}

和财产:

spring.job-datasource.jdbcUrl=jdbc:sqlserver://localhost;databaseName=ac_img_p
spring.job-datasource.username=sa
spring.job-datasource.password=StR0nGp4ss.
spring.job-datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver
spring.job-datasource.initialization-mode=always

有什么想法吗?

共有1个答案

闽康安
2023-03-14

JdbcCursorItemReader不是线程安全的,因为它扩展了不是线程安全的AbstractItemCounting ItemStreamItemReader。因此在多线程步骤中使用它是不正确的。您可以做的是用SynChronizedItemStreamReader装饰它:

@Bean
public SynchronizedItemStreamReader<Autor> autorReader() {
   String sql = "select * from ...";

   JdbcCursorItemReader<Autor> jdbcCursorItemReader = new JdbcCursorItemReader<>();
   jdbcCursorItemReader.setDataSource(this.dataSource);
   jdbcCursorItemReader.setSql(sql);
   jdbcCursorItemReader.setVerifyCursorPosition(false);
   jdbcCursorItemReader.setRowMapper(this.autorMapper);

   SynchronizedItemStreamReader<Autor> synchronizedReader = new SynchronizedItemStreamReader<>();
   synchronizedReader.setDelegate(jdbcCursorItemReader)
   return synchronizedReader;
}

否则,您可以使用线程安全读取器,例如JdbcPagingItemReader

作为补充说明,您的autoreader方法应该返回实际类型,或者至少返回ItemStreamReader

 类似资料:
  • 我目前正在处理一批数据,这些数据来自一个拥有数百万行的大型SQL数据库。 它在处理器中执行一些处理,包括通过带有连接的大型sql查询对从Reader检索到的行进行分组。 编写器将结果写入另一个表。 问题是此Batch存在性能问题,因为Sql选择查询需要大量时间并且步骤不会在多线程中执行。 因此,我希望在多标题中运行它们,但问题是,这些步骤通过计算具有相同类型的所有行的总数来对行进行分组。 因此,如

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我在批处理作业中使用多线程步骤来处理来自源数据库的记录并写入目标数据库。该步骤基于块,由JdbcpagingItemReader、Processor和JDBCBathItemWriter组成。我明白,如果在步骤处理期间发生任何异常,数据库事务将回滚整个块。我想了解一下Spring batch在内部是如何管理的?由于这是多线程步骤,因此不能保证处理器和写入器在块的同一线程中执行。块可能由不同的线程处

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 我尝试在下面的步骤中使用多线程,但下面出现了一个异常: 我的步骤:代码: 公共类SynchronizedItemStreamReader实现ResourceAwareItemReaderItemStream{ 谢谢!!!

  • 我想编写一个spring boot批处理应用程序,其中我有一个充满事件的数据库表。我想做的是有一个多线程的spring boot批处理应用程序,它将以这种方式工作: 我想有5个线程运行,每个线程将保留一个偏移量来跟踪它读取的事件,以便没有其他线程再次读取相同的事件。我想怎么做: 所以我希望能够在数据库表中为每个线程保留偏移量。有没有办法让Spring Boot环境以这种方式工作?