当前位置: 首页 > 知识库问答 >
问题:

Kafka、Avro和图式注册表

马天逸
2023-03-14

我有一个Kafka消费者配置了主题中的模式轮询,我想做的是在当前模式的基础上创建另一个Avro模式,并使用它水合数据,基本上我不需要50%的信息,需要编写一些逻辑来更改几个字段。这只是一个例子

    val consumer: KafkaConsumer<String, GenericRecord>(props) = createConsumer()
    while (true) {
        consumer.poll(Duration.ofSeconds(10).forEach {it ->
            println(it.value())
        }
    }

从stream返回的事件相当复杂,所以我将一个较小的CustomObj建模为. avsc文件,并将其编译成java。当尝试使用CustomObj运行代码时,我想做的就是使用一个事件,然后将其反序列化为一个更小的对象,只包含选定的字段。

return KafkaConsumer<String, CustomObj>(props)

这不起作用,不确定如何使用GenericRecord中的CustomObj反序列化它?让我补充一下,我没有任何访问流或其配置的权限,只能从中使用。

共有1个答案

印嘉泽
2023-03-14

在Avro中,读取器模式需要与写入器模式兼容。通过给出较小的对象,您提供了不同的阅读器模式

不可能直接反序列化为输入数据的子集,因此必须解析较大的对象并将其映射到较小的对象(这不是反序列化的作用)

 类似资料:
  • 我正在考虑使用模式来验证Kafka主题的数据。我正在结合apache kafka探索spring云模式注册表。 如果我在阅读文档后理解正确。Spring云模式注册表仅支持avro模式!在avro pojos中,需要使用类路径上的. avsc文件生成pojos,并且有一个maven插件可以完成所需的工作。 问题: 如果我的POJO上有这样的自定义验证呢?我不想在我的Kafka消费者中使用avro模式

  • 我使用的是Azure HDInsight的托管Apache Kafka解决方案,因为不幸的是Azure上没有托管汇流Kafka解决方案。是否可以运行汇合模式注册表并将其连接到HDInsight Apache Kafka集群的代理? 我希望只在单个VM上安装模式注册表,然后使用schema-registry.properties文件中的这一行,将其指向HDInsight集群的代理列表: kafkas

  • 我已经设置了Spring Cloud Stream中提供的Spring Avro模式注册表,以便在RabbitMQ中使用。我看到的大多数示例都使用Maven Avro插件从模式资源文件生成Java类。然后在架构注册表中注册架构文件。我的理解是,此注册表允许消息仅通过对已注册架构的引用进行SERDE,而不是在消息中包含整个架构。我不明白的是,在设计时,这些模式文件是如何在所有服务之间分发的,以生成J

  • 我有一个关于使用Kafka和主题(Kafka代理)和主题(Schema注册表)的不同名称设置流处理器的问题。 首先,任何操作似乎都可以与 Kafka 代理和模式注册表一起工作,但是如果处理器收到该事件,则模式注册表将魔术开始。 而不是将abc作为主题发送到模式注册表abc。bla将被发送。架构注册表的回答为“未找到”。 预期:localhost:8081/subjects/ABC/versions

  • 我使用schema registry为所有带有Kafka Streams的应用程序创建模式注册表。我们的一个流进行聚合,我想对聚合对象使用schema registry,如下所示: 但在向schema注册表添加schema时,我们需要按主题名定义schema。在流上聚合的情况下,这是不可能的,因为主题名称是由流拓扑生成的。 问题是是否有某种方法可以为任何主题创建模式,以便任何流或任何其他解决方案都

  • 我正在尝试设置一个Beam管道,以便使用python API读取Kafka的内容。我能够设置消费者配置和主题。如何更新管道以使用合流模式注册表并定义Avro消息值反序列化器?