当前位置: 首页 > 知识库问答 >
问题:

KafkaProducer和KafkaTransactionManager无异常回滚

伊羽
2023-03-14

目标-

到来自(源)MQ队列的消费者消息并发布到

a) 另一个(目标)MQ队列和

b)事务中的Kafka主题,从而避免在MQ或Kafka发布失败的情况下从源MQ中删除消息。

使用的框架

Spring启动版本-2.1.5

Spring JMS-5.1.7

SpringKafka-2.2.6

融合Kafka-5.3

MQ-9

Kafka

    @Configuration
@ConfigurationProperties(prefix = "spring.kafka")
@Slf4j
@Getter
@Setter
@ToString
@EnableTransactionManagement
public class KafkaConfig {
    /** injected local properties */
    public Map<String, Object> producerConfigs() throws IOException{
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        log.info("Value of transaction id 0 {}",transactionIdPrefix);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionIdPrefix);
        sslCommonConfigs(props);
        return props;
    }

    public Map<String, Object> consumerConfigs() throws IOException{
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutocommit);
        sslCommonConfigs(props);
        return props;
    }

    public Map<String, Object> sslCommonConfigs(Map<String, Object> props) throws IOException {
        log.info("kafka config {}",this);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, specificAvroReader);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(trustStoreValue, "kafka_truststore.jks"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePw);
        props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE); //"JKS"
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(keyStoreValue, "kafka_keystore.jks"));
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePw);
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePw);
        return props;
    }

    @Bean
    public ProducerFactory producerFactory() throws IOException {
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
        producerFactory.setTransactionIdPrefix(this.transactionIdPrefix);
        return producerFactory;
    }

    @Bean
    public KafkaTemplate<String, RawPage> ddaKafkaTemplate() throws IOException {
        return new KafkaTemplate<String, RawPage>(producerFactory());
    }
    @Bean
    public KafkaTransactionManager<String,RawPage> kafkaTransactionManager(ProducerFactory<String, RawPage> producerFactory) {
        log.info("producerFactory.transactionCapable() {}",producerFactory.transactionCapable());
        KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory);
        transactionManager.setNestedTransactionAllowed(true);
        transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
        return transactionManager;
    }

**应用程序配置类*

@Slf4j
@Configuration
public class ApplicationConfig {
    @Bean
    public JmsListenerContainerFactory<?> myMessageFactory(ConnectionFactory connectionFactory,
                                                            DefaultJmsListenerContainerFactoryConfigurer configurer,
    ChainedTransactionManager chainedTransactionManager) {
        log.debug("Connection factory instance as received {} {}",connectionFactory,configurer);
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setTransactionManager(chainedTransactionManager);
        factory.setSessionTransacted(true);
        configurer.configure(factory, connectionFactory);
        log.debug("Returning the myMessageFactory factory instance as {}",factory);
        return factory;
    }
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
    @Bean
    public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }

    @Bean
    public ChainedKafkaTransactionManager chainedTransactionManager(KafkaTransactionManager kafkaTransactionManager, JmsTransactionManager jmsTransactionManager){
        return new ChainedKafkaTransactionManager(jmsTransactionManager,kafkaTransactionManager );
    }
}

实际消费者和发布代码

    @Service
@Slf4j
@Setter
@Getter
public class MyMessageProcessor {
    @Autowired
    private KafkaTemplate<String, Event> kafkaTemplate;
    @Autowired
    private JmsTemplate jmsTemplate;

    @JmsListener(destination = "desintationQueue"
            ,containerFactory = "myMessageFactory")
    public void receiveMessage(TextMessage message){
        try {
            log.info("Received message {}",message.getText());
            send(destinationQueueName,message.getText());
            // build avro event
            publish(evnet);
            // only acknowledge if the message is successfully processed till kafka publication
            message.acknowledge();
        }catch (JMSException|CustomKafkaPublicationException e){
            log.error("Error in consuming the message from sourceSystem {}", ExceptionUtils.getStackTrace(e));            
        }
    }
    public void send(String queueName,final String msg) throws RawPagePublicationException{
        if(StringUtils.isEmpty(msg)|| StringUtils.isEmpty(queueName)){
            String errorMessage = String.format("Incorrect message and queue details msg %s queueName %s.",msg,queueName);
            log.error(errorMessage);
            throw new CustomKafkaPublicationException(errorMessage);
        }
        log.info("Publishing the message to destination queue {} at time  in millis {}",queueName,System.currentTimeMillis());
        jmsTemplate.convertAndSend(queueName, msg);
        log.info("Published the message to queue {} at time in millis {}",queueName,System.currentTimeMillis());
    }

主Spring靴类

@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
@EnableJms
@Slf4j
@EnableTransactionManagement
@EnableRetry
@EnableAutoConfiguration(exclude = 
{JmsHealthIndicatorAutoConfiguration.class, KafkaAutoConfiguration.class})
public class MyConsumerApplication {

错误日志

    - [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:49.270 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager   : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-12-08 19:39:49.332 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager   : Created JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f] from Connection [com.ibm.mq.jms.MQConnection@5d644e4a]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state READY to IN_TRANSACTION
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.848 DEBUG 37524 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Received message of type [class com.ibm.jms.JMSTextMessage] from consumer [com.ibm.mq.jms.MQQueueReceiver@2826f379] of transactional session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:49.849 DEBUG 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : Processing [org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@53b59c04]
2019-12-08 19:39:49.849  INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Received message 1221222112#
2019-12-08 19:39:49.859  INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Publishing the message to destination queue DEST_QUEUE at time  in millis 1575833989859
2019-12-08 19:39:49.861 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.861 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate   : Executing callback on JMS Session: com.ibm.mq.jms.MQSession@1a7d298f
2019-12-08 19:39:49.875 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate   : Sending created message: 
  JMSMessage class: jms_text
  JMSType:          null
  JMSDeliveryMode:  2
  JMSDeliveryDelay: 0
  JMSDeliveryTime:  0
  JMSExpiration:    0
  JMSPriority:      4
  JMSMessageID:     null
  JMSTimestamp:     0
  JMSCorrelationID: null
  JMSDestination:   null
  JMSReplyTo:       null
  JMSRedelivered:   false
1221222112#
2019-12-08 19:39:49.902 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.905  INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Published the message to queue DEST_QUEUE at time in millis 1575833989905
2019-12-08 19:39:49.997 DEBUG 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Publishing message with key 106e096a-4633-49c8-abaa-a8d0bade84d2: value {"content": "1221222112#", "sourceType": "MQ", "sourceLocation": "MINT", "msgType": null, "correlationId": "106e096a-4633-49c8-abaa-a8d0bade84d2", "receivedTs": 1575833989997}
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.007 TRACE 37524 --- [enerContainer-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Requesting metadata update for topic TOPIC-SAMP-DATA.
2019-12-08 19:39:50.170  INFO 37524 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: O5fhv74bT9KIkV17ia8snQ
2019-12-08 19:39:50.309 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
    at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
    at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
    at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
    at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
    at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)
.
2019-12-08 19:39:50.310 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Error in consuming the message from sourceSystem com.mysample.consumer.exception.CustomKafkaPublicationException: Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
    at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
    at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
    at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
    at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
    at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)
.
    at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:111)
    at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : No result object given - no result to handle
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.316 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=SAMP-CON-0, producerId=435000, producerEpoch=39, result=COMMIT)
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Not sending EndTxn for completed transaction since no partitions or offsets were successfully added
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state COMMITTING_TRANSACTION to READY
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager   : Initiating transaction commit
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager   : Committing JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:50.396 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.411 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager   : Resuming suspended transaction after completion of inner transaction
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.412 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager   : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

问题

Kakfa生产者不会回滚交易,因此导致

  1. 目标队列包含不应该存在的数据

尝试过的事情

>

  • 在回滚中使用Throwable vs Exception

    将用于发布到队列和主题的代码放入单独的类中

    我们如何测试回滚场景是否适用于Kafka制作人?

  • 共有2个答案

    云啸
    2023-03-14

    尝试使用ChainedKafkaTransactionManager并将所有使用的事务管理器连接到其中:

        @Bean
        public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(DataSourceTransactionManager dataSourceTransactionManager,
                                                                                        KafkaTransactionManager<String, String> kafkaTransactionManager) {
            return new ChainedKafkaTransactionManager<String, String>(kafkaTransactionManager, dataSourceTransactionManager);
        }
    

    在我的例子中,DataSourceTransactionManager已被KafkaTransactionManager取代。因此@Transactional停止工作:不再回滚运行时异常。

    ChainedTransactionManager按预期工作。请记住,这不是XA事务,而是一个简单的链式1阶段提交。

    元嘉木
    2023-03-14
    at com.mysample.consumer.MyMessageProcessor$$FastClassBySpringCGLIB$$a0eef45f.invoke(<generated>)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    

    我在堆栈跟踪中没有看到事务拦截器,这意味着@Transactional不起作用-您需要在@Configuration类上启用@EnableTransactionManagement

    但是,您实际上不需要@Transactional,只需将ChainedTransactionManager注入侦听器容器工厂,就可以在那里启动两个事务。

     类似资料:
    • Supporting Functions # traceback_example.py import traceback import sys def produce_exception(recursion_level=2): sys.stdout.flush() if recursion_level: produce_exception(recursion_l

    • 嗨,我正在尝试使用HttpURLConnection POST方法进行服务器请求,但没有收到任何响应,我在SO中看到了一些帖子,但我无法理解它们。下面是我的代码 上面的代码给我错误FilenotoundExcepttion stackTrace如下 这是我的URL

    • 问题内容: 我发现以下行为至少 很奇怪 : 在子句中使用时,该异常消失。那是个错误吗?那在任何地方都有记录吗? 但是真正的问题(我将标记为正确的答案)是: python开发人员允许这种奇怪行为的原因是什么? 问题答案: 您询问了有关Python开发人员的推理。我不能为他们说话,但是没有其他行为可以理解。函数可以返回值,也可以引发异常。它不能同时做到。“最终”子句的目的是提供“保证”运行的清除代码,

    • 我在tomcat服务器中使用具有多个数据源配置的JTA原子事务。有时我会遇到以下异常: JTA事务意外回滚(可能是由于超时);嵌套的异常是javax。交易回滚异常:事务被设置为仅回滚 出现这种异常的原因是什么?

    • 这很好,但并不总是在代码中抛出运行时异常。因此,我挖掘并发现如下所示的rollbackFor; 现在,我必须更改所有代码,以使用RollBackfor更改@Transactional。但是还有其他方法可以将所有@transaction advice属性更改为rollbackFor=exception.class吗?

    • 当异常(NullPointerException)发生时,事务不回滚,并且Customer实体被持久化,但是当我用 事务正在回滚,但我不明白为什么,NullPointerException扩展了RuntimeException,文档说RuntimeException导致回滚。