当前位置: 首页 > 知识库问答 >
问题:

@Kafkalistener的序列化

钱展
2023-03-14

我使用的是spring-boot 2.3.2。使用spring-kafka->2.5.4发布。release kafka-clients->2.5.0

我有以下简单的监听器

@Slf4j
@Component
class SampleKafkaListener {
    private final ObjectMapper objectMapper;

    @Autowired
    SampleKafkaListener(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @KafkaListener(topics = "person-topic-as-string")
    public void onMessageReceived(String personMsg) throws JsonProcessingException {
        Person person = objectMapper.readerFor(Person.class).readValue(personMsg);
        log.info("Received person {}", person);
    }

    @KafkaListener(topics = "persons-topic-obj")
    public void onMessageReceived(Person person) {
        log.info("Received person {}", person);
    }
}
 spring:
   kafka:
     consumer:
       group-id: mine-group
{"firstName": "John", "lastName": "Nakamura"}
    null
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.experiments.kafka.SampleKafkaListener.onMessageReceived(com.experiments.kafka.Person)]
Bean [com.experiments.kafka.SampleKafkaListener@426229a1]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1923)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1911)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1810)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}], failedMessage=GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
        ... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.experiments.kafka.Person] for GenericMessage [payload={"firstName":"John","lastName":"Nakamura"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b486f55, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=persons-topic-obj, kafka_receivedTimestamp=1598347300390, kafka_groupId=mine-group}]
        at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
        at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:891)
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
        ... 13 common frames omitted

如果我使用

spring:
  kafka:
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

然后它就会失败,不再有异常循环

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1263)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1020)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition persons-topic-obj-0 at offset 2. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:483)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:205)
    at com.sun.proxy.$Proxy120.poll(Unknown Source)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.lang.Thread.run(Thread.java:834)


共有1个答案

邢飞白
2023-03-14

默认情况下,Kafka使用者使用StringDeserializer。您需要通过设置正确的反序列化器来配置它(Spring-Kafka已经有JsonDeserializer)

spring:
   kafka:
     consumer:
       group-id: mine-group
       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
 类似资料:
  • 我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-

  • 我只想了解@kafkaListener的范围是什么,原型还是单例。在单个主题的多个消费者的情况下,返回的是单个实例还是多个实例。在我的情况下,我有多个客户订阅单个主题并获得报告。我只是想知道如果 > 多个客户希望同时查询报告。在我的例子中,我在成功使用消息后关闭容器,但同时如果其他人想要获取报告,则容器应该打开。 如何将作用域更改为与容器的id相关联的原型(如果不是),以便每次都可以生成单独的实例

  • 我正在与spring boot spring@KafkaListener合作。我期望的行为是:我的Kafka侦听器以10个线程读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。 我定义了bean of 和spring启动配置: 我看到所有配置都能正常工作,我在jmx中看到了我的10个线程: 但是我做了这样的测试: 如果版本是 也许我的期望不是真的,这是Kafka听众的正确行为。请

  • 我正在编写一个应用程序(Spring Kotlin),它可以获取Kafka的信息。如果我在声明@KafkaListener时设置autoStartup=“true”,则应用程序可以正常运行,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。 为了避免应用程序在启动时崩溃,另一个主题建议在声明@KafkaListener时设置autoStar

  • 我试图在不使用@Kafkalistener的情况下编写kafka consumer,下面是我用于配置侦听器的代码行: 在这里,我如何配置topic和listener方法,我的consumer类可以有多个方法。 另外,我想知道在将@kafkalistener与Kafka流一起使用时是否会遇到任何潜在问题。 附言:我不想使用@KafkaListener。

  • 我正在运行spring boot,KafkaListener是我的客户。问题是我们如何从失败的kafka配置中恢复,并避免应用程序在退出代码为0的过程结束时停止。例如,不正确的配置可能是不正确的endpointurl。如果无法访问Kafka服务器,也会出现同样的情况。因此,在任何情况下,KafkaListner进程都不应该杀死服务器。 ontext.java:895应用程序上下文异常:未能启动be