当前位置: 首页 > 知识库问答 >
问题:

spring-cloud-stream和kafka-clients向后兼容

谢奕
2023-03-14

我们有一个微服务,目前使用spring-cloud-streamditmars.release,它又使用kafka-clients0.10.1.1。
我们有兴趣升级到spring-cloud-stream2.0.0.rc3,它反过来使用kafka-clients1.0.0,以解决我们遇到的一个问题:重新处理Kafka消息。
仅将一个服务升级到spring-boot2.0.0.release和spring-cloud-stream2.0.0.rc3之后,我们遇到了奇怪的行为:

升级的服务(从现在起我将称之为service-1)是某个主题send-enlorming-mail的生产者。另外,我们还有另一个(service-2),它使用spring-cloud-streamkafka-clients的旧版本,也是本主题的使用者。当我们引导service-1时,它用适当的分区创建了这个主题,然后我们引导service-2(它在新版本中使用),最后从service-1生成一条消息,我们收到以下异常

 org.springframework.messaging.MessageHandlingException: nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:406) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:725) ~[spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at com.watercorp.app.messaging.producer.MessageProducer.send(MessageProducer.java:39) ~[classes/:na]
    at com.watercorp.app.service.EnrollmentServiceImpl.sendEnrollmentMail(EnrollmentServiceImpl.java:56) ~[classes/:na]
    at com.watercorp.app.service.EnrollmentServiceImpl.enrollUsers(EnrollmentServiceImpl.java:50) ~[classes/:na]
    at com.watercorp.app.messaging.handler.UsersEnrollmentMessageHandler.handleMessage(UsersEnrollmentMessageHandler.java:60) ~[classes/:na]
    at com.watercorp.app.messaging.handler.UsersEnrollmentMessageHandler.handleMessage(UsersEnrollmentMessageHandler.java:24) ~[classes/:na]
    at com.watercorp.app.messaging.consumer.MessageConsumer.lambda$handleMessageWithRetry$0(MessageConsumer.java:84) [classes/:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at com.watercorp.app.messaging.consumer.MessageConsumer.handleMessageWithRetry(MessageConsumer.java:77) [classes/:na]
    at com.watercorp.app.messaging.consumer.MessageConsumer.handleUsersEnrollmentMessage(MessageConsumer.java:65) [classes/:na]
    at com.watercorp.app.messaging.consumer.MessageConsumer$$FastClassBySpringCGLIB$$8b03a437.invoke() [classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:112) [spring-context-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at com.watercorp.app.messaging.consumer.MessageConsumer$$EnhancerBySpringCGLIB$$a3ae110f.handleUsersEnrollmentMessage() [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) [spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) [spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_121]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_121]
    at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_121]
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133) ~[spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult(KafkaProducerMessageHandler.java:507) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:398) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    ... 62 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:354) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-1.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.0.jar:na]
    ... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
ERROR 22159 --- [ad | producer-5] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{123, 34, 109, 101, 115, 115, 97, 103, 101, 84, 121, 112, 101, 34, 58, 34, 69, 78, 82, 79, 76, 76, 7...' to topic send-enrollment-mail and partition 7:
WARN 11837 --- [           -C-1] o.a.k.c.consumer.internals.Fetcher       : Unknown error fetching data for topic-partition send-enrollment-mail-5
WARN 11837 --- [           -C-1] o.a.k.c.consumer.internals.Fetcher       : Unknown error fetching data for topic-partition send-enrollment-mail-5

共有1个答案

姬捷
2023-03-14

您必须使用headermode=embeddedheadersnone,以便与旧的(Ditmars)SCSt应用程序兼容(取决于这些应用程序使用更多的header)。2.0应用程序的原生头模式是native-因为Kafka现在支持头。

 类似资料:
  • 我有一个问题反序列化来自Kafka主题的消息。这些消息已经使用spring-cloud-stream和Apache Avro序列化。我正在用斯普林斯·Kafka阅读它们,并试图反序列化它们。如果我使用spring-cloud来生成和使用消息,那么我就可以很好地反序列化消息。问题是当我用Spring Kafka消费它们,然后试图反序列化。 我正在使用一个模式注册表(用于开发的spring-boot模

  • spring-cloud-stream-kafka-elasticsearch The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-se

  • Kafka Broker 1.0.2 Spring Boot 1.5.2 SpringKafka1.2.2 Spring Cloud Stream Chelsea.sr2(Spring Cloud Stream Core 1.2.2.Release) 几天前,Spring-Kafka和Spring Boot之间的兼容性在一个不同的问题中被问及。Spring-Kafka项目页面更新了更多关于兼容性的

  • kafka-xxx:本机应用程序 spring-boot-xxx:Spring Cloud Stream Applications 问题是由原生Kafka生成器生成的Avro消息不能被Spring Cloud Stream应用程序解封,例如: 原生Kafka生产者(Kafka-客户-服务项目) 在本例中,本机应用程序直接崩溃,出现异常() 如何确保Spring Cloud Stream Produ

  • 我最近开始为Kafka研究Spring Cloud Stream,并且一直在努力使TestBinder与Kstreams一起工作。这是一个已知的限制,还是我忽略了什么? 这很好: 字符串处理器: 字符串测试: 但当我试图在流程中使用KStream时,我无法让TestBinder正常工作。 Kstream处理器: KStream测试: 正如您可能已经注意到的,我从Kstream处理器中省略了@Str

  • 当尝试运行稍微修改过的word count示例版本时,我遇到了一个错误,即“没有符合条件的'org.apache.kafka.streams.kstream.KStreamBuilder'类型的bean”。在我的POM中,我使用了Spring-Cloud-Stream依赖项:Elmhurst。M3导入依赖项,其中导入了spring cloud stream绑定器kstream:2.0.0。立方米。

  • 我有兴趣在一些制作人中使用Publisher确认,我们在一个项目中使用了Spring Cloud Stream。我试过做一个小的PoC,但它不起作用。据我在文档中看到的情况,Asynchronous Publisher可以确认这一点,并且应该很容易进行以下更改: 在应用程序中添加。yml confirmAckChannel并启用errorChannelEnabled属性。 然后是一个由endpoi

  • 我试图创建一个简单的程序来打印一个Kafka主题的Kstream。我不断地得到一个NPE和完全没有想法。 我已经使用了spring cloud-stream-binder-kafka-streams依赖项,并且我正在使用spring cloud的最新版本“Finchley.m9”。 我写的代码是: Application.Properties具有: 当我启动服务时,我在控制台上不断得到以下错误: