当前位置: 首页 > 知识库问答 >
问题:

Spring云Kafka流反序列化问题由以下原因引起:java.lang.ClassCast异常:ClassName无法转换为ClassName

孔皓
2023-03-14

我正在尝试使用Spring云Kafka流绑定器来使用来自主题的Avro消息,但无法修复此class Cast异常。

这是我的代码:

  @Bean
  public Consumer<KStream<EventKey, Event>> process(){
    return input -> {
      input.peek(((key, value) -> logger.info("key value: "+ key.toString()+" value: "+value.toString())));
      logger.info("Received:" + input);
    };
  }

  @Bean
  public Serde<EventKey> avroInSerde(){
    final SpecificAvroSerde<EventKey> avroInSerde = new SpecificAvroSerde<>();
    Map<String, Object> serdeProperties = new HashMap<>();
    return avroInSerde;
  }

  @Bean
  public Serde<Event> avroOutSerde(){
    final SpecificAvroSerde<Event> avroOutSerde = new SpecificAvroSerde<>();
    return avroOutSerde;
  }

粘合剂:

spring:
  application:
    name: ${applicaton-name}
  cloud:
    stream:
      function:
        definition: process
      bindings:
        process-in-0:
          destination: ${input-topic-name}
          contentType: application/Avro
        process-out-0:
          destination: ${enriched-topic-name}
          contentType: application/Avro
      binding-retry-interval: 30
      kafka:
        streams:
          binder:
            brokers: ${kafka-broker}
            application-id: ${consumer-group-name}
            auto-create-topics: false
            auto-add-partitions: false
            configuration:
              processing.guarantee: at_least_once
              auto.offset.reset: earliest
              schema.registry.url: ${kafka-schema-registry}
              auto-register-schema: false
              security.protocol: SSL
              useNativeEncoding: true
              specific.avro.reader: true

错误:

Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.ClassCastException: EventKey cannot be cast to EventKey
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)

我尝试了本链接中提到的两种方法https://spring.io/blog/2019/12/04/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-3-data-deserialization-and-serialization但没有运气

我错过了什么吗?

共有1个答案

莫兴言
2023-03-14

原因:java.lang.ClassCastException:EventKey无法强制转换为EventKey

这可能是类装入器问题;由于反序列化器和消费者 Bean 由不同的类加载器加载,您是否使用 Spring DevTools?

使用spry-kafka,可以通过显式创建消费者工厂并将反序列化器注入其中来避免这种情况。

使用 spring-cloud-stream(从版本 3.0.6 开始),您可以提供一个 ClientFactoryCustomizer bean 并注入反序列化程序实例(定义为 @Beans,以便它们使用相同的类加载器)。

或者,停止使用DevTools。

 类似资料:
  • 我使用的是带有Kafka活页夹和Avro的SpringCloudStream版本2.2.0。显然,一个不正确的记录被发布到Kafka主题中,导致所有消费者返回反序列化错误,并进行某种无限重试。 从技术上讲,应该有一种方法可以指定反序列化异常的策略。我可以找到一些不同的策略,如和,但它们适用于我在应用程序中不使用的Kafka流。如果有人能帮助我理解这里缺少什么,我将不胜感激。

  • 我使用spring数据来创建jpa和mongo。 附属国: spring版本是4.0。2.释放 SpringDataJPA版本是1.4。3.释放 spring数据mongodb版本为1.2。0.1释放 XML配置: 型号: 例外: 有人知道吗? 非常感谢。。

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 我正在创建一个简单的Kafka Streaming应用程序。我的Producer正在为一个主题生成protobuf序列化消息,我在Kafka Streaming应用程序中使用该主题来处理消费者消息。我正在尝试使用在我的应用程序中。yml文件。我发现以下错误。 错误: 我的配置文件: 在日志中打印实际反序列化消息的处理方法: 请帮我解决这个问题。如何在Kafka Stream中使用protobuf反

  • 我正在使用dropwizard 1.1.0和java 8功能。我也在使用Immutables包。我在尝试将利润中心列表(字符串列表)从JSON转换为java等价物时,遇到了反序列化问题。 错误 找不到非具体集合类型的反序列化器[集合类型;类com.google.common.collect.ImMutableList,包含[简单类型,类java.lang.字符串]] 不可变Java类 不可变项将上