当前位置: 首页 > 知识库问答 >
问题:

Kafka流:序列化回avro

苏昊英
2023-03-14

我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。

我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。

这是流类:

class StreamsProcessor(val brokers: String, val schemaRegistryUrl: String) {

    private val logger = LogManager.getLogger(javaClass)

    fun process() {
        val streamsBuilder = StreamsBuilder()

        val avroSerde = GenericAvroSerde().apply {
            configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
        }

        val movieAvro = SpecificAvroSerde<Movie>().apply{
            configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
        }

        val movieAvroStream: KStream<String, GenericRecord> = streamsBuilder
                .stream(movieAvroTopic, Consumed.with(Serdes.String(), avroSerde))

        val movieStream: KStream<String, StreamMovie> = movieAvroStream.map {_, movieAvro ->
            val movie = StreamMovie(
                    movieId = movieAvro["name"].toString() + movieAvro["year"].toString(),
                    director = movieAvro["director"].toString(),
            )
             KeyValue("${movie.movieId}", movie)
        }

        // This where I'm stuck, the call is wrong because movieStream is not a <String, movieAvro> object 
        movieStream.to(movieTopic, Produced.with(Serdes.String(), movieAvro))

        val topology = streamsBuilder.build()

        val props = Properties()
        props["bootstrap.servers"] = brokers
        props["application.id"] = "movies-stream"
        val streams = KafkaStreams(topology, props)
        streams.start()
    }
}

谢谢

共有1个答案

皮献
2023-03-14

结果流的类型是KStream

为什么要尝试使用specifiavroserde

比较https://github.com/confluentinc/kafka-streams-examples/blob/5.4.1-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

 类似资料:
  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。

  • 我对Kafka和克里奥很陌生。我一直在使用默认的Kafka序列化器处理Kafka上的简单字符串消息,但我试图使用Kryo序列化,但没有成功。 谁能解释一下或者给我看一个用kryo序列化(生产者和消费者)通过Kafka发送java对象的例子吗?

  • 我有一台装有Java 1.6的服务器。在这里,我需要使用Confluent的< code > KafkaAvroDeserializer 来反序列化avro消息。 问题是: 如果我使用Confluent-1.0(它与Java兼容 如果我使用Confluent-2.0或更高版本,它拥有一切,但它只与java兼容 在这种情况下我该怎么办? 为了比较: http://docs.confluent.io/

  • 我使用的是Spring Kafka 1.1.2-Spring Boot 1.5.0 RC版本,并且配置了一个自定义值serialiser/Deserialiser类,扩展/。这些类确实使用Jackson ObjectMapper,它可以通过构造函数提供。 是否可以从Spring上下文中注入ObjectMapper?我已经配置了一个ObjectMapper,我希望在序列化/反序列化程序中重用它。