当前位置: 首页 > 知识库问答 >
问题:

如何通过引用Kafka主题的另一个模式来更新Avro模式?

寿意远
2023-03-14

如果Kafka主题的Avro模式被用作另一个模式的参考,那么更新该模式的正确html" target="_blank">方法是什么?

例如,假设我们有两个Kafka主题:一个使用Avro模式用户{“type”:“record”,“namespace”:“test”,“name”:“User”,“fields”:[{“name”:“username”:“username”,“type”:“string”}]},另一个使用UserAction{“type”:“record”,“namespace”:“test”,“name”:“UserAction”,“fields”:[{“name”:“action”,“type”:“string”},{“name”:“User”,“type”:“test User”}}

然后我想给用户添加一个额外的字段——一个“姓氏”,所以它看起来像这样:。。。“字段”:[{“名称”:“用户名”,“类型”:“字符串”},{“名称”:“姓氏”,“类型”:[“字符串”,“空”],“默认值”:空}],空以使此更改兼容。要做到这一点,我可以更改Avro模式文件,使用Maven模式插件重新生成POJO,然后如果我将使用KafkaTemplate向第一个主题发送消息,则将更新模式,并且在主题上显示新字段。

问题是,如果我向第二个主题发送一条带有UserAction的消息,它仍然会引用旧的用户模式,没有“姓氏”字段,即使POJOs会正确地看到它。因此,发送的任何“姓氏”都不会存储在主题中,在消费者中会作为空值接收。

第二种方式是强制用户更新模式吗?

共有1个答案

濮阳
2023-03-14

虽然Confluent Schema Registry允许在注册时进行引用,但我认为它不会随着您只更改一个模型而动态更新。

相反,您可以定义一个模式“monorepo”,将模式更改打包并注册在一起。

例如,在Avro IDL中,您可以定义一个文件

record User {
  // fields here
}

record UserAction {
  User user;
  string action;
}

如果使用Avro-Maven pluginidl schemataaction,它将反映两个输出AVSC模式文件中的所有用户更改。

当Java模型被创建时,它将拥有所有必要的字段。不过,您需要分别更新依赖于这些模型的所有外部客户端。

 类似资料:
  • 我有一个简单的案例类: 我正在添加字段“name” java.util.NoSuchelementException:scala.collection.immutable.stream$empt$.head(stream.scala:1104)在scala.collection.immutable.stream$empt$.head(stream.scala:1102)在test.consumer

  • 我们正在使用Flink表API在Flink应用程序中使用一个Kafka主题。 当我们第一次提交应用程序时,我们首先从我们的自定义注册表中读取最新的模式。然后使用Avro模式创建一个Kafka数据流和表。我的数据序列化器的实现与汇合模式注册表的工作方式类似,方法是检查模式ID,然后使用注册表。因此我们可以在运行时应用正确的模式。

  • 使用avro的Kafka模式管理为我们提供了向后兼容性的灵活性,但我们如何处理方案中的突破性更改? 假设生产者A向消费者C发布消息M 假设消息M的方案发生了重大变化(例如名称字段现在被拆分为名字和姓氏),我们有了新方案M-new 现在我们正在部署producer A-New和Consumer C-New 问题是,在我们的部署过程完成之前,我们可以让生产者发布一条新消息M-new,消费者C(旧的那个

  • 问题内容: 我正在为约会应用程序构建Mongoose模式。 我希望每个文档都包含对它们所经历过的所有事件的引用,其中另一个是系统中具有自己模型的架构。如何在架构中对此进行描述? 问题答案: 您可以使用 人口 来描述它 填充是用其他集合中的文档自动替换文档中指定路径的过程。我们可以填充单个文档,多个文档,普通对象,多个普通对象或查询返回的所有对象。 假设您的事件模式定义如下: 为了显示如何使用填充,

  • 我正在使用Apache Beam的kafkaIO阅读一个主题,该主题在Confluent schema Registry中有一个avro模式。我可以反序列化消息并写入文件。但最终我想写给BigQuery。我的管道无法推断架构。我如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)能够推断模式? 下面是我使用模式注册表url设置反序列化器的代码,以及我从Kafka读到

  • 关于Avro序列化的消息是如何被Kafka和Schema Registry处理的,我想要了解的是,从这篇文章中,我了解到模式ID存储在每个消息中的可预测位置,因此我们似乎可以在同一个主题中拥有不同模式的消息,并且能够找到正确的模式,并基于此成功地反序列化它们。另一方面,我看到许多人似乎在使用“一个模式附加到一个主题”的表述,但这意味着每个主题都有一个模式。 那么哪一个是对的呢?我是否可以利用模式注