目标-
到来自(源)MQ队列的消费者消息并发布到
a) 另一个(目标)MQ队列和
b)事务中的Kafka主题,从而避免在MQ或Kafka发布失败的情况下从源MQ中删除消息。
使用的框架
Spring启动版本-2.1.5
Spring JMS-5.1.7
SpringKafka-2.2.6
融合Kafka-5.3
MQ-9
Kafka
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
@Slf4j
@Getter
@Setter
@ToString
@EnableTransactionManagement
public class KafkaConfig {
/** injected local properties */
public Map<String, Object> producerConfigs() throws IOException{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
log.info("Value of transaction id 0 {}",transactionIdPrefix);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionIdPrefix);
sslCommonConfigs(props);
return props;
}
public Map<String, Object> consumerConfigs() throws IOException{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutocommit);
sslCommonConfigs(props);
return props;
}
public Map<String, Object> sslCommonConfigs(Map<String, Object> props) throws IOException {
log.info("kafka config {}",this);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, specificAvroReader);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(trustStoreValue, "kafka_truststore.jks"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePw);
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE); //"JKS"
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(keyStoreValue, "kafka_keystore.jks"));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePw);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePw);
return props;
}
@Bean
public ProducerFactory producerFactory() throws IOException {
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix(this.transactionIdPrefix);
return producerFactory;
}
@Bean
public KafkaTemplate<String, RawPage> ddaKafkaTemplate() throws IOException {
return new KafkaTemplate<String, RawPage>(producerFactory());
}
@Bean
public KafkaTransactionManager<String,RawPage> kafkaTransactionManager(ProducerFactory<String, RawPage> producerFactory) {
log.info("producerFactory.transactionCapable() {}",producerFactory.transactionCapable());
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory);
transactionManager.setNestedTransactionAllowed(true);
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return transactionManager;
}
**应用程序配置类*
@Slf4j
@Configuration
public class ApplicationConfig {
@Bean
public JmsListenerContainerFactory<?> myMessageFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer,
ChainedTransactionManager chainedTransactionManager) {
log.debug("Connection factory instance as received {} {}",connectionFactory,configurer);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setTransactionManager(chainedTransactionManager);
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
log.debug("Returning the myMessageFactory factory instance as {}",factory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(KafkaTransactionManager kafkaTransactionManager, JmsTransactionManager jmsTransactionManager){
return new ChainedKafkaTransactionManager(jmsTransactionManager,kafkaTransactionManager );
}
}
实际消费者和发布代码
@Service
@Slf4j
@Setter
@Getter
public class MyMessageProcessor {
@Autowired
private KafkaTemplate<String, Event> kafkaTemplate;
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "desintationQueue"
,containerFactory = "myMessageFactory")
public void receiveMessage(TextMessage message){
try {
log.info("Received message {}",message.getText());
send(destinationQueueName,message.getText());
// build avro event
publish(evnet);
// only acknowledge if the message is successfully processed till kafka publication
message.acknowledge();
}catch (JMSException|CustomKafkaPublicationException e){
log.error("Error in consuming the message from sourceSystem {}", ExceptionUtils.getStackTrace(e));
}
}
public void send(String queueName,final String msg) throws RawPagePublicationException{
if(StringUtils.isEmpty(msg)|| StringUtils.isEmpty(queueName)){
String errorMessage = String.format("Incorrect message and queue details msg %s queueName %s.",msg,queueName);
log.error(errorMessage);
throw new CustomKafkaPublicationException(errorMessage);
}
log.info("Publishing the message to destination queue {} at time in millis {}",queueName,System.currentTimeMillis());
jmsTemplate.convertAndSend(queueName, msg);
log.info("Published the message to queue {} at time in millis {}",queueName,System.currentTimeMillis());
}
主Spring靴类
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
@EnableJms
@Slf4j
@EnableTransactionManagement
@EnableRetry
@EnableAutoConfiguration(exclude =
{JmsHealthIndicatorAutoConfiguration.class, KafkaAutoConfiguration.class})
public class MyConsumerApplication {
错误日志
- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:49.270 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-12-08 19:39:49.332 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Created JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f] from Connection [com.ibm.mq.jms.MQConnection@5d644e4a]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state READY to IN_TRANSACTION
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.848 DEBUG 37524 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Received message of type [class com.ibm.jms.JMSTextMessage] from consumer [com.ibm.mq.jms.MQQueueReceiver@2826f379] of transactional session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:49.849 DEBUG 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : Processing [org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@53b59c04]
2019-12-08 19:39:49.849 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Received message 1221222112#
2019-12-08 19:39:49.859 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Publishing the message to destination queue DEST_QUEUE at time in millis 1575833989859
2019-12-08 19:39:49.861 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.861 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate : Executing callback on JMS Session: com.ibm.mq.jms.MQSession@1a7d298f
2019-12-08 19:39:49.875 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate : Sending created message:
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: null
JMSTimestamp: 0
JMSCorrelationID: null
JMSDestination: null
JMSReplyTo: null
JMSRedelivered: false
1221222112#
2019-12-08 19:39:49.902 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.905 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Published the message to queue DEST_QUEUE at time in millis 1575833989905
2019-12-08 19:39:49.997 DEBUG 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Publishing message with key 106e096a-4633-49c8-abaa-a8d0bade84d2: value {"content": "1221222112#", "sourceType": "MQ", "sourceLocation": "MINT", "msgType": null, "correlationId": "106e096a-4633-49c8-abaa-a8d0bade84d2", "receivedTs": 1575833989997}
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.007 TRACE 37524 --- [enerContainer-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Requesting metadata update for topic TOPIC-SAMP-DATA.
2019-12-08 19:39:50.170 INFO 37524 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: O5fhv74bT9KIkV17ia8snQ
2019-12-08 19:39:50.309 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
.
2019-12-08 19:39:50.310 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Error in consuming the message from sourceSystem com.mysample.consumer.exception.CustomKafkaPublicationException: Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
.
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:111)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : No result object given - no result to handle
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.316 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=SAMP-CON-0, producerId=435000, producerEpoch=39, result=COMMIT)
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Not sending EndTxn for completed transaction since no partitions or offsets were successfully added
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state COMMITTING_TRANSACTION to READY
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Initiating transaction commit
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Committing JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:50.396 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.411 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Resuming suspended transaction after completion of inner transaction
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.412 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
问题
Kakfa生产者不会回滚交易,因此导致
尝试过的事情
>
在回滚中使用Throwable vs Exception
将用于发布到队列和主题的代码放入单独的类中
我们如何测试回滚场景是否适用于Kafka制作人?
尝试使用ChainedKafkaTransactionManager
并将所有使用的事务管理器连接到其中:
@Bean
public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(DataSourceTransactionManager dataSourceTransactionManager,
KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>(kafkaTransactionManager, dataSourceTransactionManager);
}
在我的例子中,DataSourceTransactionManager已被KafkaTransactionManager取代。因此@Transactional停止工作:不再回滚运行时异常。
ChainedTransactionManager按预期工作。请记住,这不是XA事务,而是一个简单的链式1阶段提交。
at com.mysample.consumer.MyMessageProcessor$$FastClassBySpringCGLIB$$a0eef45f.invoke(<generated>)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
我在堆栈跟踪中没有看到事务拦截器,这意味着@Transactional
不起作用-您需要在@Configuration
类上启用@EnableTransactionManagement
。
但是,您实际上不需要@Transactional
,只需将ChainedTransactionManager
注入侦听器容器工厂,就可以在那里启动两个事务。
Supporting Functions # traceback_example.py import traceback import sys def produce_exception(recursion_level=2): sys.stdout.flush() if recursion_level: produce_exception(recursion_l
嗨,我正在尝试使用HttpURLConnection POST方法进行服务器请求,但没有收到任何响应,我在SO中看到了一些帖子,但我无法理解它们。下面是我的代码 上面的代码给我错误FilenotoundExcepttion stackTrace如下 这是我的URL
问题内容: 我发现以下行为至少 很奇怪 : 在子句中使用时,该异常消失。那是个错误吗?那在任何地方都有记录吗? 但是真正的问题(我将标记为正确的答案)是: python开发人员允许这种奇怪行为的原因是什么? 问题答案: 您询问了有关Python开发人员的推理。我不能为他们说话,但是没有其他行为可以理解。函数可以返回值,也可以引发异常。它不能同时做到。“最终”子句的目的是提供“保证”运行的清除代码,
我在tomcat服务器中使用具有多个数据源配置的JTA原子事务。有时我会遇到以下异常: JTA事务意外回滚(可能是由于超时);嵌套的异常是javax。交易回滚异常:事务被设置为仅回滚 出现这种异常的原因是什么?
这很好,但并不总是在代码中抛出运行时异常。因此,我挖掘并发现如下所示的rollbackFor; 现在,我必须更改所有代码,以使用RollBackfor更改@Transactional。但是还有其他方法可以将所有@transaction advice属性更改为rollbackFor=exception.class吗?
当异常(NullPointerException)发生时,事务不回滚,并且Customer实体被持久化,但是当我用 事务正在回滚,但我不明白为什么,NullPointerException扩展了RuntimeException,文档说RuntimeException导致回滚。