当前位置: 首页 > 知识库问答 >
问题:

无法使用Spring Cloud流反序列化Kafka流中的数据

邹玄裳
2023-03-14

我正在创建一个简单的Kafka Streaming应用程序。我的Producer正在为一个主题生成protobuf序列化消息,我在Kafka Streaming应用程序中使用该主题来处理消费者消息。我正在尝试使用valueSerde:io.confluent.kafka.streams.serdes.protobuf反序列化消息。KafkaProtobufSerde在我的应用程序中。yml文件。我发现以下错误。

错误:

org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 2, 0, 10, 13, 84, 105, 109, 101, 114, 32, 109, 101, 115, 115, 97, 103, 101, 16, 1, 34, 12, 8, -126, -107, -127, -120, 6, 16, -12, -88, -117, -114, 2]] from topic [MYINPUT-TOPIC]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1ff0a0d (above 0x0010ffff) at char #1, byte #7)
    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[jackson-core-2.11.3.jar:2.11.3]
    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[jackson-core-2.11.3.jar:2.11.3] 

我的应用程序。yml配置文件:

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-in-0:
          consumer:
            max-attempts: 3
            valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
            back-off-initial-interval: 100
            retryable-exceptions:
              javax:
                validation:
                  ValidationException: false
          destination: MYINPUT-TOPIC
          group: consumer-group
          concurrency: 2
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            schema-registry-u-r-l: http://localhost:8081
            auto-offset-reset: "earliest"
            configuration:
              commit-interval-ms: 100

在日志中打印实际反序列化消息的处理方法:

@Component
@Slf4j
public class ProcessStream {

//Here below Timer object is Protobuf's auto-generated class, I am using it to deserialize messages. 
//I'm getting byte Aarry on this method when I'm debugging it.  
    @Bean
    public Consumer<KStream<String, Timer>> process() {
        return (InputStream) -> {
            InputStream.foreach((k,v) -> log.info(String.format("key: %s, value: %s",k, v)));
        };
    }
}

请帮我解决这个问题。如何在Kafka Stream中使用protobuf反序列化消息?

共有1个答案

家经纶
2023-03-14

你需要使用一个 protobuf Serde 来包装一个合适的 protobuf 反序列化器。Confluent schema registry 提供了一个 protobuf Serde 实现。有关详细信息,请参阅此处。我还没有测试过这个特定的实现,但看起来应该可以工作。如果您正在使用它(或自定义的 protobuf serde 实现),那么您只需提供该类型的 bean 即可在 Spring Cloud Stream 应用程序中注册它。见下文。

@Bean
public KafkaProtobufSerde<Timer> kafkaProtobufSerde() {
}

Spring Cloud Stream中的Kafka Streams绑定器将检测此bean并与您的消费者类型匹配。

根据以下评论中共享的示例应用程序进行更新

在您的配置中进行以下更改后,我能够运行您的示例应用程序:

spring:
  cloud:
    stream:
      function:
        definition: process
      bindings:
        process-in-0:
          group: consumer-group
          concurrency: 1
          headerMode: none
          destination: MESSAGING-TIMER-EXAMPLE
      kafka:
        streams:
          binder:
            configuration:
              schema.registry.url: http://localhost:8081

您需要< code > Kafka . streams . binder 前缀。在做出更改后,我能够启动应用程序而不会出现任何错误。我在启动日志中看到了这些。

2021-07-29 15:10:42.995  INFO 65137 --- [           main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Key Serde used for process-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
2021-07-29 15:10:42.996  INFO 65137 --- [           main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Value Serde used for process-in-0: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
...
...
2021-07-29 15:10:43.629  INFO 65137 --- [           main] c.j.ListenerMessagingStreamsApplication  : Started ListenerMessagingStreamsApplication in 1.836 seconds (JVM running for 2.305)

我注意到您将yaml配置文件命名为< code > application-local . yml 。如果您正在使用这个属性,那么请确保您在运行时设置了这个属性。否则,将该文件重命名为< code>application.yml。

 类似资料:
  • 问题内容: 关于如何解决此错误的任何想法?我在Hibernate中使用Spring JPA。下面的必要详细信息。 实体类别1: 用户的实体类: 和stacktrace: stacktrace非常大。我将下面的整个stacktrace粘贴以供参考,并在此处粘贴快速TL; DR: 完整的堆栈跟踪如下: 更新: 我将Ways实体类更新为以下内容,但仍然出现错误: 问题答案: 当无法反序列化某些内容时,就

  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢

  • 我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?

  • 你能帮我理解这个摘录,从Kafka流留档: 在可能正在处理多个主题分区的流任务中,如果用户将应用程序配置为不等待所有分区都包含一些缓冲数据,并从具有最小时间戳的分区中选取以处理下一条记录,则稍后在为其他主题分区提取某些记录时,它们的时间戳可能比从另一个主题分区获取的已处理记录小。 我不明白其中的逻辑:如果您选择具有最小时间戳的分区,为什么以后会获得较小的时间戳?