我们正在使用Flink表API在Flink应用程序中使用一个Kafka主题。
当我们第一次提交应用程序时,我们首先从我们的自定义注册表中读取最新的模式。然后使用Avro模式创建一个Kafka数据流和表。我的数据序列化器的实现与汇合模式注册表的工作方式类似,方法是检查模式ID,然后使用注册表。因此我们可以在运行时应用正确的模式。
在大多数情况下,你不需要改变任何东西来使它工作。
在Avro中,有读写器模式的概念。Writer模式是用来生成Avro记录的模式,它被编码到有效负载中(在大多数情况下作为id)。
应用程序使用Reader模式来理解数据。如果您进行特定的计算,您将使用Avro记录的一组特定字段。
如果您真的想要丰富应用程序中的模式,那么这种方法是不合适的;例如,您总是希望添加一个字段calculated
并返回所有其他字段。那么新添加的字段将不会被拾取,因为实际上您的阅读器模式会更改。在这种情况下,您需要重新启动或使用通用记录架构。
我有一个简单的案例类: 我正在添加字段“name” java.util.NoSuchelementException:scala.collection.immutable.stream$empt$.head(stream.scala:1104)在scala.collection.immutable.stream$empt$.head(stream.scala:1102)在test.consumer
如果Kafka主题的Avro模式被用作另一个模式的参考,那么更新该模式的正确方法是什么? 例如,假设我们有两个Kafka主题:一个使用Avro模式用户,另一个使用UserAction。 然后我想给用户添加一个额外的字段——一个“姓氏”,所以它看起来像这样:,空以使此更改兼容。要做到这一点,我可以更改Avro模式文件,使用Maven模式插件重新生成POJO,然后如果我将使用KafkaTemplate
我们使用Apache Kafka(不是confluent Kafka)0.10。我们想用Kafka设置AVRO模式。我有如下的avro模式。 序列化消息, 这正像预期的那样起作用。 但是,希望在主题级别设置一个Avro模式,这样,如果消息不符合Avro模式,主题将拒绝消息。 不管怎么说,我可以用阿帕奇Kafka0.10做到这一点。
假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?
我有一个连续接收流插入的表(可能每秒数千个)。 我对使用更新功能(通过API调用)添加列感兴趣。我可以在数据仍在插入时调用Update将列添加到现有表中,而无需担心数据丢失吗? 作为参考,这是我计划用于向表中添加列的代码:
我正试图了解更多关于我们在Kafka主题中使用的Avro模式的信息,我对这一点相对来说比较陌生。 我想知道是否有一种方法可以在特定情况下发展模式。我们用一个不能为null的新字段或任何默认值来更新模式,因为这些新字段是标识符。解决这个问题的方法是创建新主题,但是有没有更好的方法来改进现有模式?