当前位置: 首页 > 知识库问答 >
问题:

如何检查avro模式注册表的使用情况

茹照
2023-03-14
 def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
    system,
    new StringSerializer,
    new ByteArraySerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("schema.registry.url", "http://localhost:8081")
    .withProperty("auto.create.topics.enable", "true")

  def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
    ConsumerSettings(
      system,
      new StringDeserializer,
      new ByteArrayDeserializer)
      .withBootstrapServers("localhost:9092")
      .withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("schema.registry.url", "http://localhost:8081")
      .withGroupId("test")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

如果不是--如何修复?

共有1个答案

那宏大
2023-03-14

您使用了错误的类,因此您的属性可能会出现错误

您实际上需要在这里使用KafKaavroserializer生成器

new StringSerializer,
new ByteArraySerializer)

kafkaavrodeSerializer

new StringDeserializer,
new ByteArrayDeserializer)
 类似资料:
  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我有一个Kafka消费者配置了主题中的模式轮询,我想做的是在当前模式的基础上创建另一个Avro模式,并使用它水合数据,基本上我不需要50%的信息,需要编写一些逻辑来更改几个字段。这只是一个例子 从stream返回的事件相当复杂,所以我将一个较小的CustomObj建模为. avsc文件,并将其编译成java。当尝试使用CustomObj运行代码时,我想做的就是使用一个事件,然后将其反序列化为一个更

  • 如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。

  • 我正在考虑使用模式来验证Kafka主题的数据。我正在结合apache kafka探索spring云模式注册表。 如果我在阅读文档后理解正确。Spring云模式注册表仅支持avro模式!在avro pojos中,需要使用类路径上的. avsc文件生成pojos,并且有一个maven插件可以完成所需的工作。 问题: 如果我的POJO上有这样的自定义验证呢?我不想在我的Kafka消费者中使用avro模式

  • Spark是否有任何最佳实践来处理在Avro中使用模式注册表序列化的kafka流?尤其是对于Spark结构化流? 我在https://github.com/ScalaConsultants/spark-kafka-avro/blob/master/src/main/scala/io/scalac/spark/AvroConsumer.scala找到了一个例子。但是我无法加载类。我在mvnrepos