当前位置: 首页 > 知识库问答 >
问题:

基于数据库/MQ交互的spring Kafka事务管理

柴宏浚
2023-03-14

我们正试图在spring Kafka消费者中实现事务管理。

我们有Kafka消费者在收听主题A的信息-

我面临的问题是,当数据库事务提交失败时,主题B上的发送操作不会回滚。因此,系统处于不一致状态。

其他场景按预期工作。

例如:

>

  • 从kafka读取msg-

    阅读Kafka的消息-

    PS:我知道kafka不支持XA事务。我确实看到一些参考资料提到了ChainedTransactionManager的使用,根据文件,它是从spring data core 2.5版本中弃用的,所以我不想最好使用它。任何建议都将不胜感激。

    我把代码片段放在下面-

    主要内容:

    @KafkaHandler
    @Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
    public void receiveCreationMessage(String event){
     
      // saves into database
      dao.saveDraft(event);
      
      // sends kafka message
      sendMQAdapterKafkaEvent(initiateReqMq, convertedEvent);
     
    }
     
     @Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
     public void saveDraft(@NonNull String event) {
     
        entityManager.joinTransaction();
        entityManager.persist(event);
    }
     
    @Transactional(transactionManager = "kafkaTxM", propagation = Propagation.REQUIRED)
    public void sendMQAdapterKafkaEvent(String mqName, Object message) throws 
     
                kafkaTemplate.send(topicName, msgKey, message)
    }
    

    一个pplication.properties

    spring.kafka.producer.transaction-id-prefix=txnId-
    spring.kafka.consumer.properties.isolation.level=read_committed
    spring.kafka.properties.transactional-id=trans-id-
    

    Kafka监听器容器工厂:

    @Bean(KafkaConstants.ContainerFactoryNames.MANUAL_COMMIT_CONTAINER_FACTORY)
        @Autowired()
        public ConcurrentKafkaListenerContainerFactory<Object, Object> manualCommitKafkaListenerContainerFactory(
                @NonNull final ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                @NonNull final KafkaProperties kafkaProperties,
                @NonNull final RecordMessageConverter converter,
                @NonNull final ErrorHandler errorHandler,
                @NotNull final RetryTemplate retryTemplate,
                @NotNull @Qualifier("kafkaTxM") @Lazy final KafkaTransactionManager kafkaTransactionManager) {
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    
            val consumerProperties = kafkaProperties.buildConsumerProperties();
            consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            val consumerFactory = new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);
    
    
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    
            factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0L)));
            factory.setAfterRollbackProcessor(new TestRollbackProcesor(errorHandler, kafkaTransactionManager, new FixedBackOff(0L, 0L)));
            configurer.configure(factory, consumerFactory);
    
            return factory;
        }
    

    Kafka生产工厂-

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(final KafkaProperties kafkaProperties) throws IOException {
    
        val factory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
        val transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        factory.transactionCapable();
        return factory;
    }
    

    德克萨斯州Kafka经理bean——

    @Bean(name = "kafkaTxM")
    public KafkaTransactionManager kafkaTransactionManager(final MyConfiguration myConfig,
                                                           final KafkaProperties kafkaProperties) throws IOException {
        KafkaTransactionManager ktm = new KafkaTransactionManager(kafkaProducerFactory(myConfig, kafkaProperties));;
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }
    

    日志-

    13:20:19.317 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {TOPICNAME=OffsetAndMetadata{offset=1904, leaderEpoch=null, metadata=''}}
    
    13:20:19.318 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@53e77e17] sendOffsetsToTransaction({TOPICNAME=OffsetAndMetadata{offset=1904, leaderEpoch=null, metadata=''}}, GroupMetadata(groupId = CG.MANAGER-dev-blue1, generationId = 11, memberId = C.ABC_XYZ_MANAGER-1d309d57-0245-417e-bb94-5e49fe811257-0-971960cf-6bc4-4aa0-bf46-eb15900e1abf, groupInstanceId = ))
    
    13:20:19.491 [kafka-producer-network-thread | P.ABC_XYZ_MANAGER-e57fc82a-77bc-4490-87cb-46e7b584746c-1] INFO  o.a.k.c.p.i.TransactionManager - [Producer clientId=P.ABC_XYZ_MANAGER-e57fc82a-77bc-4490-87cb-46e7b584746c-1, transactionalId=DraftMgr-CG.MANAGER-dev-blue1.TOPICNAME.0] Discovered group coordinator ivapp12230670573.devin1.ms.com:9093 (id: 3 rack: null)
    
    13:20:20.285 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.i.TransactionInterceptor - Completing transaction for [com.ms.wmbanking.draftmanager.kafka.DraftAwaitingSaveListener.receiveCreationMessage]
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.l.a.RecordMessagingMessageListenerAdapter - Listener method returned result [InvocationResult [result=null, sendTo=null, messageReturnType=false]] - generating response message for it
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.l.a.RecordMessagingMessageListenerAdapter - No replyTopic to handle the reply: null
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering beforeCommit synchronization
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering beforeCompletion synchronization
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.s.TransactionSynchronizationManager - Removed value [org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization@4f6f2a3e] for key [SessionImpl(322929867<open>)] from thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.t.KafkaTransactionManager - Initiating transaction commit
    
    13:20:20.286 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@53e77e17] commitTransaction()
    
    13:20:20.369 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering afterCommit synchronization
    
    13:20:20.706 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN  o.h.e.jdbc.spi.SqlExceptionHelper - SQL Error: -302, SQLState: 22001
    
    13:20:20.707 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.h.e.jdbc.spi.SqlExceptionHelper - DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=4.13.127
    
    13:20:20.797 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.s.TransactionSynchronizationManager - Clearing transaction synchronization
    
    13:20:20.797 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.t.KafkaTransactionManager - Triggering afterCompletion synchronization
    
    13:20:20.797 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.t.s.TransactionSynchronizationManager - Removed value [org.springframework.kafka.core.KafkaResourceHolder@16feb510] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@5f113675] from thread [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
    
    13:20:20.798 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@53e77e17] close(PT5S)
    
    13:20:20.798 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Transaction rolled back
    
    org.springframework.orm.jpa.JpaSystemException: Error while committing the transaction; nested exception is javax.persistence.RollbackException: Error while committing the transaction
    
                    at org.springframework.orm.jpa.EntityManagerFactoryUtils.convertJpaAccessExceptionIfPossible(EntityManagerFactoryUtils.java:408)
    
                    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.convertException(ExtendedEntityManagerCreator.java:508)
    
                    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:480)
    
                    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:136)
    
                    at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:124)
    
                    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:945)
    
                    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:782)
    
                    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    
                    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1839)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1811)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
    
                    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    
                    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    
                    at java.lang.Thread.run(Thread.java:748)
    
    Caused by: javax.persistence.RollbackException: Error while committing the transaction
    
                    at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:81)
    
                    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:104)
    
                    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:477)
    
                    ... 14 common frames omitted
    
    Caused by: javax.persistence.PersistenceException: org.hibernate.exception.DataException: could not execute statement
    
                    at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:154)
    
                    at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:181)
    
                    at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:65)
    
                    ... 16 common frames omitted
    
    Caused by: org.hibernate.exception.DataException: could not execute statement
    
                    at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:52)
    
                    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
    
                    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113)
    
                    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99)
    
                    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:200)
    
                    at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3302)
    
                    at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3829)
    
                    at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:107)
    
                    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604)
    
                    at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478)
    
                    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    
                    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475)
    
                    at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:345)
    
                    at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40)
    
                    at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:93)
    
                    at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362)
    
                    at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:453)
    
                    at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:3212)
    
                    at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2380)
    
                    at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:447)
    
                    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183)
    
                    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40)
    
                    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281)
    
                    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101)
    
                    ... 15 common frames omitted
    
    Caused by: com.ibm.db2.jcc.am.SqlDataException: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=4.13.127
    
                    at com.ibm.db2.jcc.am.id.a(id.java:669)
    
                    at com.ibm.db2.jcc.am.id.a(id.java:60)
    
                    at com.ibm.db2.jcc.am.id.a(id.java:127)
    
                    at com.ibm.db2.jcc.am.no.b(no.java:2310)
    
                    at com.ibm.db2.jcc.am.no.c(no.java:2293)
    
                    at com.ibm.db2.jcc.t4.cb.l(cb.java:370)
    
                    at com.ibm.db2.jcc.t4.cb.a(cb.java:62)
    
                    at com.ibm.db2.jcc.t4.q.a(q.java:50)
    
                    at com.ibm.db2.jcc.t4.tb.b(tb.java:220)
    
                    at com.ibm.db2.jcc.am.oo.oc(oo.java:3428)
    
                    at com.ibm.db2.jcc.am.oo.b(oo.java:4383)
    
                    at com.ibm.db2.jcc.am.oo.b(oo.java:4554)
    
                    at com.ibm.db2.jcc.am.oo.gc(oo.java:784)
    
                    at com.ibm.db2.jcc.am.oo.executeUpdate(oo.java:763)
    
                    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
    
                    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
    
                    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    
                    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    
                    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    
                    at java.lang.reflect.Method.invoke(Method.java:498)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic.performQueryExecutionListener(StatementProxyLogic.java:316)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic.access$700(StatementProxyLogic.java:37)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic$1.execute(StatementProxyLogic.java:123)
    
                    at net.ttddyy.dsproxy.listener.MethodExecutionListenerUtils.invoke(MethodExecutionListenerUtils.java:42)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic.invoke(StatementProxyLogic.java:120)
    
                    at net.ttddyy.dsproxy.proxy.jdk.PreparedStatementInvocationHandler.invoke(PreparedStatementInvocationHandler.java:37)
    
                    at com.sun.proxy.$Proxy303.executeUpdate(Unknown Source)
    
                    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:197)
    
                    ... 34 common frames omitted
    
    13:20:20.801 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR c.m.w.m.kafka.TestRollbackProcesor - Backoff none exhausted for ConsumerRecord(topic = TOPICNAME, partition = 0, leaderEpoch = 16, offset = 1903, CreateTime = 1633938613586, serialized key size = -1, serialized value size = 2228, headers = RecordHeaders(headers = [RecordHeader(key = singularityheader, value = [110, 111, 116, 120, 100, 101, 116, 101, 99, 116, 61, 116, 114, 117, 101, 42, 99, 116, 114, 108, 103, 117, 105, 100, 61, 49, 54, 51, 51, 55, 55, 50, 55, 57, 52, 42, 97, 112, 112, 73, 100, 61, 55, 52, 50, 42, 110, 111, 100, 101, 105, 100, 61, 49, 50, 53, 50, 49, 48])], isReadOnly = false), key = null, value = {
    
      "DATA":"ASJHNFHJJHGJHHJK"
    
    })
    
    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.orm.jpa.JpaSystemException: Error while committing the transaction; nested exception is javax.persistence.RollbackException: Error while committing the transaction
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2117)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1865)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1811)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
    
                    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    
                    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    
                    at java.lang.Thread.run(Thread.java:748)
    
    Caused by: org.springframework.orm.jpa.JpaSystemException: Error while committing the transaction; nested exception is javax.persistence.RollbackException: Error while committing the transaction
    
                    at org.springframework.orm.jpa.EntityManagerFactoryUtils.convertJpaAccessExceptionIfPossible(EntityManagerFactoryUtils.java:408)
    
                    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.convertException(ExtendedEntityManagerCreator.java:508)
    
                    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:480)
    
                    at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:136)
    
                    at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:124)
    
                    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:945)
    
                    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:782)
    
                    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    
                    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
    
                    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1839)
    
                    ... 7 common frames omitted
    
    Caused by: javax.persistence.RollbackException: Error while committing the transaction
    
                    at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:81)
    
                    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:104)
    
                    at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerSynchronization.afterCommit(ExtendedEntityManagerCreator.java:477)
    
                    ... 14 common frames omitted
    
    Caused by: javax.persistence.PersistenceException: org.hibernate.exception.DataException: could not execute statement
    
                    at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:154)
    
                    at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:181)
    
                    at org.hibernate.internal.ExceptionConverterImpl.convertCommitException(ExceptionConverterImpl.java:65)
    
                    ... 16 common frames omitted
    
    Caused by: org.hibernate.exception.DataException: could not execute statement
    
                    at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:52)
    
                    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
    
                    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113)
    
                    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99)
    
                    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:200)
    
                    at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3302)
    
                    at org.hibernate.persister.entity.AbstractEntityPersister.insert(AbstractEntityPersister.java:3829)
    
                    at org.hibernate.action.internal.EntityInsertAction.execute(EntityInsertAction.java:107)
    
                    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604)
    
                    at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478)
    
                    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    
                    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475)
    
                    at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:345)
    
                    at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40)
    
                    at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:93)
    
                    at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362)
    
                    at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:453)
    
                    at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:3212)
    
                    at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2380)
    
                    at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:447)
    
                    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183)
    
                    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40)
    
                    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281)
    
                    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101)
    
                    ... 15 common frames omitted
    
    Caused by: com.ibm.db2.jcc.am.SqlDataException: DB2 SQL Error: SQLCODE=-302, SQLSTATE=22001, SQLERRMC=null, DRIVER=4.13.127
    
                    at com.ibm.db2.jcc.am.id.a(id.java:669)
    
                    at com.ibm.db2.jcc.am.id.a(id.java:60)
    
                    at com.ibm.db2.jcc.am.id.a(id.java:127)
    
                    at com.ibm.db2.jcc.am.no.b(no.java:2310)
    
                    at com.ibm.db2.jcc.am.no.c(no.java:2293)
    
                    at com.ibm.db2.jcc.t4.cb.l(cb.java:370)
    
                    at com.ibm.db2.jcc.t4.cb.a(cb.java:62)
    
                    at com.ibm.db2.jcc.t4.q.a(q.java:50)
    
                    at com.ibm.db2.jcc.t4.tb.b(tb.java:220)
    
                    at com.ibm.db2.jcc.am.oo.oc(oo.java:3428)
    
                    at com.ibm.db2.jcc.am.oo.b(oo.java:4383)
    
                    at com.ibm.db2.jcc.am.oo.b(oo.java:4554)
    
                    at com.ibm.db2.jcc.am.oo.gc(oo.java:784)
    
                    at com.ibm.db2.jcc.am.oo.executeUpdate(oo.java:763)
    
                    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
    
                    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:136)
    
                    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    
                    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    
                    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    
                    at java.lang.reflect.Method.invoke(Method.java:498)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic.performQueryExecutionListener(StatementProxyLogic.java:316)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic.access$700(StatementProxyLogic.java:37)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic$1.execute(StatementProxyLogic.java:123)
    
                    at net.ttddyy.dsproxy.listener.MethodExecutionListenerUtils.invoke(MethodExecutionListenerUtils.java:42)
    
                    at net.ttddyy.dsproxy.proxy.StatementProxyLogic.invoke(StatementProxyLogic.java:120)
    
                    at net.ttddyy.dsproxy.proxy.jdk.PreparedStatementInvocationHandler.invoke(PreparedStatementInvocationHandler.java:37)
    
                    at com.sun.proxy.$Proxy303.executeUpdate(Unknown Source)
    
                    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:197)
    
                    ... 34 common frames omitted
    
  • 共有1个答案

    巢海
    2023-03-14

    13:20:20.798[org.springframework.kafka.KafkaListenerEndpoint Container#0-0-C-1]错误o. s. k. l. KafkaMessageListenerContainer$Listener消费者-交易回滚

    Kafka事务确实回滚了。

    也许您的消费者正在使用默认的isolation.levelread_uncommitted)。

    使用Kafka时,生产者记录总是写入日志,然后是一个标记块,指示事务是提交还是回滚。

    消费者必须有隔离。级别读取提交的跳过已回滚的记录。

     类似资料:
    • 当开发者第一次接触 PHP 时,通常会使用类似下面的代码来将数据库的交互与表示层逻辑混在一起: <ul> <?php foreach ($db->query('SELECT * FROM table') as $row) { echo "<li>".$row['field1']." - ".$row['field1']."</li>"; } ?> </ul> 这从任何方面来看都是

    • 本文向大家介绍php基于session实现数据库交互的类实例,包括了php基于session实现数据库交互的类实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了php基于session实现数据库交互的类。分享给大家供大家参考。具体如下: 希望本文所述对大家的php程序设计有所帮助。

    • 在代码里事务提交后方法结束,此时数据库是否已经执行了事务?因为事务提交返回了成功说明数据库已经处理了这个事务提交,但是此时数据库实际是否已经有执行完成这个事务?还是代码里事务提交完成后数据库实际并没有执行完成,只是先返回了成功的信息?现在碰到一个情况是方法提交结束后另外一个程序立刻调用存储过程查询数据会出现查不到的情况。

    • 目前想要的效果是根据数据绘制散点图,鼠标框选得到选中的数据 纯d3的案例: https://observablehq.com/@d3/brushable-scatterplot-matrix 但d3太复杂了,看了几个库: vega-lite,Observable Plot,没找到交互方面的介绍,有推荐吗? 谢谢

    • 主要内容:1. 引入 tx 命名空间,2. 配置事务管理器,3. 配置事务通知,4. 配置切点切面,示例 Spring 声明式事务管理是通过 AOP 实现的,其本质是对方法前后进行拦截,然后在目标方法开始之前创建(或加入)一个事务,在执行完目标方法后,根据执行情况提交或者回滚事务。 声明式事务最大的优点就是对业务代码的侵入性低,可以将业务代码和事务管理代码很好地进行解耦。 Spring 实现声明式事务管理主要有 2 种方式: 基于 XML 方式的声明式事务管理。 通过 Annotation 注解

    • 我创建了一个示例--SPRING,JPA(EclipseLink持久性提供程序)和JTA事务管理器(JBoss7)。我观察到数据库中的所有数据都正确地显示在UI中以进行读操作。但是当涉及保存/更新或删除操作时,服务层不将工作提交到数据库。没有捕获到异常(我也检查了控制台/日志,并且调试了代码,可以看到EntityManager.persist/remove被调用,没有任何异常)。 > module