当前位置: 首页 > 知识库问答 >
问题:

无法使用ConFluent Schema注册表和Spring Cloud Stream反序列化Avro消息

明宜年
2023-03-14

我正在使用Spring Cloud Stream和Confluent Schema Registry注册Avro模式。

架构注册成功。但是,当我的流侦听器接收到消息时,负载仍然以字节为单位。

这是我的财产。

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8081
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamic-schema-generation-enabled=true
spring.cloud.stream.schemaRegistryClient.cached=true
spring.cloud.stream.schema.avro.schemaLocations=classpath*:schemas/*.avsc
spring.cloud.stream.bindings.input.contentType=application/*+avro

在接收消息时,我注意到“AbstractAvroMessageConverter”中的“convertFromInternal”从未被调用,而这应该是用来解码消息的。

共有1个答案

佘辰龙
2023-03-14

您使用的是什么版本的SCSt?在1.3上与confluent schema registry server集成时,我们修复了几个问题。

此外,您不需要设置输入contentType,解析通过包含应用程序/vnd*avro的标题进行,这是转换器启动并从服务器找到正确模式的提示。您仅在输出通道上设置该contentType,它将被替换为正确的版本,例如:<代码>应用程序/vnd。使用者v1 avro

 类似资料:
  • 我尝试将我的自定义类型的< code>ProducerRecord发送到Kafka,但我收到错误消息: 我在schema:GET中设置了schema 回应: 得到 回答 } 这是我的Scala类: Kafka制作人的部分: 我错过了什么?我看到我可以为我的类实现SpecificRecord,但在我阅读的书/教程中,我没有看到这一点。谢谢 编辑:固定类名

  • 我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。

  • 我正在创建一个avro类,它包含一个字符串和一个映射作为字段。我可以通过maven生成avro类,并且能够在localhost:8081中创建注册表

  • 我是flink和kafka的新手。我正在尝试使用合流模式注册表对avro数据进行反序列化。我已经在ec2机器上安装了flink和kafka。此外,在运行代码之前已经创建了“测试”主题。 代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2 作为实现的一部分,该代码执行以下操作: 运行flink可执行jar的

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 我正在尝试用Avro序列化器和模式注册表向Kafka发送一个对象。 这里是一个简化的代码: 我假设架构是从注册表“幕后”读取的,对象(用户)是序列化的,但我得到了下面的错误。 我遗漏了什么? 我必须显式读取架构并发送GenericRecord吗? org.apache.kafka.common.errors.SerializationException:序列化Avro消息 时出错,原因是:java