def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
system,
new StringSerializer,
new ByteArraySerializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withProperty("auto.create.topics.enable", "true")
def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
ConsumerSettings(
system,
new StringDeserializer,
new ByteArrayDeserializer)
.withBootstrapServers("localhost:9092")
.withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty("key.converter.schema.registry.url", "http://localhost:8081")
.withProperty("value.converter.schema.registry.url", "http://localhost:8081")
.withProperty("schema.registry.url", "http://localhost:8081")
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
如果不是--如何修复?
您使用了错误的类,因此您的属性可能会出现错误
您实际上需要在这里使用KafKaavroserializer
生成器
new StringSerializer,
new ByteArraySerializer)
和kafkaavrodeSerializer
new StringDeserializer,
new ByteArrayDeserializer)
我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化
我有一个Kafka消费者配置了主题中的模式轮询,我想做的是在当前模式的基础上创建另一个Avro模式,并使用它水合数据,基本上我不需要50%的信息,需要编写一些逻辑来更改几个字段。这只是一个例子 从stream返回的事件相当复杂,所以我将一个较小的CustomObj建模为. avsc文件,并将其编译成java。当尝试使用CustomObj运行代码时,我想做的就是使用一个事件,然后将其反序列化为一个更
如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。
我正在考虑使用模式来验证Kafka主题的数据。我正在结合apache kafka探索spring云模式注册表。 如果我在阅读文档后理解正确。Spring云模式注册表仅支持avro模式!在avro pojos中,需要使用类路径上的. avsc文件生成pojos,并且有一个maven插件可以完成所需的工作。 问题: 如果我的POJO上有这样的自定义验证呢?我不想在我的Kafka消费者中使用avro模式
Spark是否有任何最佳实践来处理在Avro中使用模式注册表序列化的kafka流?尤其是对于Spark结构化流? 我在https://github.com/ScalaConsultants/spark-kafka-avro/blob/master/src/main/scala/io/scalac/spark/AvroConsumer.scala找到了一个例子。但是我无法加载类。我在mvnrepos