当前位置: 首页 > 知识库问答 >
问题:

Spring批处理-ItemWriter-DataIntegrityViolationException-跳过记录-重试-不工作

於子晋
2023-03-14

我从某个时候起就被这个问题困扰着。

我使用的是spring batch 3.0.7

问题是在ItemWriter中的一个记录中出现org.springframework.dao.DataIntegrityViolationExcue的情况下,组块(组块大小=10)中的其余记录也不会插入到数据库中,即使在提供了skipPolicy之后(对于所有异常返回true)。

我的理解是,如果在提交整个大块时出现任何异常(通过skipPolicy跳过),当前事务将被回滚。查克大小(提交间隔)将被重置为1,块的每个记录将被单独重试以找到有问题的记录,只有该记录不会进入数据库。其余的记录将存入数据库。

但这并没有发生。区块事务正在回滚,但不会重试个别记录。

我在skipPolicy(shouldSkip方法)和skipListener(onSkipInWrite方法)中也加入了调试点,但流并没有到达调试点。

我正在使用Crudepository和Hibernate作为DAO层(从writer调用相同的代码)

下面是我的代码:

/**
     * @return processCRMRecords
     */
    @Bean
    public Step processCRMRecordsStep() {
        return stepBuilderFactory.get("crm-sr-files-processing-step").<CRMSrRecord, SRRecordEO> chunk(10)
                .reader(crmSrMultiFileReader(null, null)).processor(crmSrRecordProcessor()).writer(crmRecordWriter())
        .faultTolerant().skipPolicy(skipSRPolicy()).listener(listener).build();
    }

    /**
     * @return SkipPolicy
     */
    @Bean
    public SkipPolicy skipSRPolicy() {
        return new RecordVerificationSkipper();
    }

船长政策:

public class RecordVerificationSkipper implements SkipPolicy {

    Logger logger = LoggerFactory.getLogger(RecordVerificationSkipper.class);

    /**
     *
     */
    public RecordVerificationSkipper() {
        //
    }

    /* (non-Javadoc)
     * @see org.springframework.batch.core.step.skip.SkipPolicy#shouldSkip(java.lang.Throwable, int)
     */
    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {

        return true;
    }

}

SkipListener:

@Override
public void onSkipInWrite(S item, Throwable t) {
    if (item instanceof IRecordString) {
        logger.info(((IRecordString) item).getRecordAsString());
    }
}

跳过的异常:

org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; constraint [null]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:278)
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:244)
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:521)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy70.commit(Unknown Source)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64)
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy73.run(Unknown Source)
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:214)
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:231)
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:123)
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:117)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:732)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:716)
    at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:703)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:304)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
    at com.dbs.app.batch.BatchAplication.main(BatchAplication.java:19)
Caused by: org.hibernate.exception.ConstraintViolationException: could not execute statement
    at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:59)
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:207)
    at org.hibernate.engine.jdbc.batch.internal.NonBatchingBatch.addToBatch(NonBatchingBatch.java:45)
    at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:2949)
    at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3449)
    at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:89)
    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:582)
    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:456)
    at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:337)
    at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:39)
    at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1282)
    at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:465)
    at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:2963)
    at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2339)
    at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:485)
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:147)
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$100(JdbcResourceLocalTransactionCoordinatorImpl.java:38)
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:231)
    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:65)
    at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:61)
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517)
    ... 53 common frames omitted
Caused by: java.sql.SQLIntegrityConstraintViolationException: (conn:554184) Cannot add or update a child row: a foreign key constraint fails (`domain`.`table_dummy`, CONSTRAINT `DUMMY_CCA_PR_FK9` FOREIGN KEY (`RT_ID`) REFERENCES `type_table_dummy` (`RT_ID`))
Query is: insert into table_dummy (is_new, otherparameo_ot_id, rt_id, status, req_sub_type, created_by, created_datetime, updated_by, updated_datetime, unique_ref_num, rst_id) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), parameters [0,<null>,'test123','A','aaaa','qqqq','2017-10-06 17:20:51.845',<null>,<null>,'test123','test123']
    at org.mariadb.jdbc.internal.util.ExceptionMapper.get(ExceptionMapper.java:133)
    at org.mariadb.jdbc.internal.util.ExceptionMapper.getException(ExceptionMapper.java:101)
    at org.mariadb.jdbc.internal.util.ExceptionMapper.throwAndLogException(ExceptionMapper.java:77)
    at org.mariadb.jdbc.MariaDbStatement.executeQueryEpilog(MariaDbStatement.java:226)
    at org.mariadb.jdbc.MariaDbServerPreparedStatement.executeInternal(MariaDbServerPreparedStatement.java:413)
    at org.mariadb.jdbc.MariaDbServerPreparedStatement.execute(MariaDbServerPreparedStatement.java:362)
    at org.mariadb.jdbc.MariaDbServerPreparedStatement.executeUpdate(MariaDbServerPreparedStatement.java:351)
    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:204)
    ... 72 common frames omitted
Caused by: org.mariadb.jdbc.internal.util.dao.QueryException: Cannot add or update a child row: a foreign key constraint fails (`appid`.`table_dummy`, CONSTRAINT `DUMMY_CCA_PR_FK9` FOREIGN KEY (`RT_ID`) REFERENCES `type_table_dummy` (`RT_ID`))
Query is: insert into table_dummy (is_new, otherparameo_ot_id, rt_id, status, req_sub_type, created_by, created_datetime, updated_by, updated_datetime, unique_ref_num, rst_id) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), parameters [0,<null>,'test123','A','aaaa','qqqq','2017-10-06 17:20:51.845',<null>,<null>,'test123','test123']
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.readErrorPacket(AbstractQueryProtocol.java:1144)
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.readPacket(AbstractQueryProtocol.java:1076)
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.getResult(AbstractQueryProtocol.java:1031)
    at org.mariadb.jdbc.internal.protocol.AbstractQueryProtocol.executePreparedQuery(AbstractQueryProtocol.java:617)
    at org.mariadb.jdbc.MariaDbServerPreparedStatement.executeInternal(MariaDbServerPreparedStatement.java:401)
    ... 75 common frames omitted

Spring批处理执行写入器,但在写入器执行后,我得到以下异常

18:07:06 [main] WARN  o.h.e.jdbc.spi.SqlExceptionHelper - SQL Error: 1452, SQLState: 23000
18:07:06 [main] ERROR o.h.e.jdbc.spi.SqlExceptionHelper - (conn:554369) Cannot add or update a child row: a foreign key constraint fails (`domain`.`table_dummy`, CONSTRAINT `Test_CCA_PR_FK9` FOREIGN KEY (`RT_ID`) REFERENCES `table_type_dummy` (`RT_ID`))
Query is: insert into table_dummy (is_new, otherparameo_ot_id, rt_id, status, req_sub_type, created_by, created_datetime, updated_by, updated_datetime, unique_ref_num, rst_id) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?), parameters [0,<null>,'test123','A','Test','group','2017-10-06 18:07:04.726',<null>,<null>,'test','testtext']
18:07:06 [main] INFO  o.h.e.j.b.internal.AbstractBatchImpl - HHH000010: On release of batch it still contained JDBC statements
18:07:06 [main] INFO  o.s.b.core.step.tasklet.TaskletStep - Commit failed while step execution data was already updated. Reverting to old version.

我希望你能帮忙

共有2个答案

韩羽
2023-03-14

正如“user8740552”提到的,我能够通过同样的方法解决这个问题。

“在writer中插入实体管理器,并在writer中保存项的区块列表后使用flush,即write()方法本身中的for循环之后”

杜元明
2023-03-14

在writer中插入实体管理器,并在save call as commit超出item writer的范围后使用flush。您可以在写入程序的所有块列表保存之后刷新,因为它不会对性能产生影响。

 类似资料:
  • 假设我有一个像这样配置的简单任务: 当抛出MyRetryableException时,我重试了15次,但最后我得到了

  • 我在没有ItemWriter的情况下定义了我的tasklet,如下所示: 我得到了这个错误: 配置问题:

  • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同

  • 我有一个假问题。为了解释我的用例,我有不同类型的DAO;比如说啤酒。。。等等,我想用一个通用的ItemWriter来处理所有的问题。我创建了一个,在这里我定义了; 作家班是这样的; 到目前为止一切都还好。事情变得复杂的地方是,我为每个存储库定义了单独的配置类,其中定义了存储库特定的项。例如,为用户插入数据库的步骤。 当我用IJ编写这段代码时,我抱怨

  • 我的情况: 我在readerItem中从db读取了类A。然后我需要处理这个类A并创建我在item处理机中做的类B。最后,我将这个类B保存到itemWriter中的db中。 问题:在处理过程中,我还需要创建具有类B外键的类C(约1 mil记录)并保存该类C。 我不能做这样的事情:因为正如我写的,我有大约100万条记录,我需要在内存中存储大约2gb的空间。所以我应该如何解决这个问题。 更新: 可能的解

  • 我有以下步骤在批处理工作。 当某个异常抛出时,我在parseStepSkipListener中捕捉他并登录数据库。 我期待以下行为: 作业已启动 执行前面的步骤 开始执行解析步骤 阅读项目 进程项 写 哦,异常。 捕获异常,登录数据库,转到下一个块(读取、处理、写入)。 作业已启动 执行前面的步骤 开始执行解析步骤 阅读项目 进程项 写 哦,异常。 进程项 写入项 哦,异常。 捕获异常,登录数据库