下面的日志每次都在后面。(同一条消息的错误几乎是30倍)
我试着重新启动Kafka和Spring Boot应用程序,仍然是同样的问题。
日志:
2019-11-06 16:04:49.176 WARN [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] o.s.k.core.DefaultKafkaProducerFactory : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@658b4494, txId=xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2]
2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to completeNormalFulfillmentItem 124
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.createAuditForCompleteFulfilmentFail(<generated>)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeDefaultFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1214)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1312)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.handleExistingTransaction(AbstractPlatformTransactionManager.java:430)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:354)
at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:69)
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:106)
... 67 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:499)
at brave.kafka.clients.TracingProducer.beginTransaction(TracingProducer.java:50)
at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:103)
at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:160)
... 71 common frames omitted
2019-11-06 16:04:49.178 ERROR [xxxxx-component-workflow-handler,47fb7bf746423fae,83d15cb4c9f92635,false] 10 --- [_response-4-C-1] .s.i.ComponentWorkflowHandlerServiceImpl : failed to complete fulfillment item (completeFulfillmentItem) :124
com.xxxxx.model.exception.ProcessException: failed to completeNormalFulfillmentItem 124
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeNormalFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:1316)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl.completeFulfillmentItem(ComponentWorkflowHandlerServiceImpl.java:994)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$FastClassBySpringCGLIB$$f9512a9d.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.xxxxx.business.workflow.component.handler.service.impl.ComponentWorkflowHandlerServiceImpl$$EnhancerBySpringCGLIB$$63013f17.completeFulfillmentItem(<generated>)
at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener.handleFulfillmentComplete(WorkflowAsynHandlerListener.java:100)
at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$FastClassBySpringCGLIB$$f84eaee9.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at com.xxxxx.business.workflow.component.handler.listener.WorkflowAsynHandlerListener$$EnhancerBySpringCGLIB$$2ab8db7b.handleFulfillmentComplete(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$a252ca0f.onMessage(<generated>)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1308)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1291)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1252)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1177)
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1167)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1145)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:765)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
at
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:5}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:180000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:100}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
@Value("${kafka.max.records:100}")
private Integer maxPollRecords;
@Value("${kafka.max.poll.interval.time:500000}")
private Integer maxPollIntervalMs;
@Value("${kafka.max.session.timeout:200000}")
private Integer sessionTimoutMs;
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setStatefulRetry(true);
// NOTE: retryMaxAttempts should always +1 due to spring kafka bug
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:" + record.topic() + " value:"
+ record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts + 1);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Disable the Auto Commit if required for testing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimoutMs);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
听者
@KafkaListener(id = TOPIC_FULFILLMENT_CREATE, topics = TOPIC_FULFILLMENT_CREATE)
@Transactional(readOnly = false)
public void processCreateRequest(@Payload String message) throws IOException {
ComponentWorkflowModel componentWorkflowModel = JsonUtil.toObject(message, ComponentWorkflowModel.class);
componentWorkflowStarter.processCreateRequest(componentWorkflowModel);
}
>
有什么解决方案可以停止收听错误消息吗?我使用了seekErrorHandler它有时只工作,有任何配置问题吗?
Kafka和Spring的Kafka有什么问题吗?
如何解决这一问题?
看起来配置不正确。
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:475)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
...
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId xxxxx-Zeebe-Process-Handler-ZK6uxfEizXyDxU-complete_fulfillment_item_response.complete_fulfillment_item_response.2: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:176)
当一个事务已经存在时,您正在尝试启动一个新的事务。
您已经将链接的事务管理器注入到侦听器容器中,因此已经存在一个事务。
有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者
大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是: 我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。 更新:我已经找到了一种方法来做它与factory.setBatchList
所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递
我们使用spring kafka配置来接收来自上游系统的消息。我们有用于主题配置的java配置 这段代码工作得非常好,因为我们有单独的容器工厂。现在我们需要启动此应用程序的多个实例,其中一个实例将监听firstContainer,而第二个Container将被禁用 并且对于第二个实例,它只会启用第二个容器并禁用第一个容器。有人能帮助了解是否可以禁用从主题(主题列表)中监听吗?
我试图了解如何跟踪Kafka的信息摄取。 我们现在遵循的工作流程是清除主题中的所有消息,然后我们用代码更改重新摄取。我需要知道那些代码更改有多成功。在当前状态下,我正在使用Kafka工具,手动刷新消息总数,并将结果保存在csv中,我知道这是不可持续的长期。 你对自动获取Kafka主题中的消息计数有什么建议?理想情况下,我想击中的主题一分钟一分钟的频率,并得到计数,以及窗口的时间,如1天等。