当前位置: 首页 > 知识库问答 >
问题:

kafka streams groupby(akka aggr)操作合流模式注册表中的Avro模式

拓拔泓
2023-03-14

我使用schema registry为所有带有Kafka Streams的应用程序创建模式注册表。我们的一个流进行聚合,我想对聚合对象使用schema registry,如下所示:

stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(timeWindowDuration, chronoUnit)))
.aggregate(initAggreggatedObj(), aggregateData(), Materialized.with(Serdes.String(), avroSerde))

但在向schema注册表添加schema时,我们需要按主题名定义schema。在流上聚合的情况下,这是不可能的,因为主题名称是由流拓扑生成的。

问题是是否有某种方法可以为任何主题创建模式,以便任何流或任何其他解决方案都可以使用?

共有1个答案

越鸿才
2023-03-14

Thx,OneCricketeer,你的回答是正确的。有auto。登记schemas参数,默认为true

 类似资料:
  • 我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题

  • 我正在尝试设置一个Beam管道,以便使用python API读取Kafka的内容。我能够设置消费者配置和主题。如何更新管道以使用合流模式注册表并定义Avro消息值反序列化器?

  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我希望即使服务器重新启动,也能保持一个具有固定id的模式。 是否可以在模式注册表中保存模式,以便在服务器崩溃后使用相同的id? 否则,是否有可能在模式注册表服务器启动时用固定的id硬编码一个模式?

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。