我的目标是从HTTP源获取JSON数据并使用AVRO序列化将其存储在Kafka主题中。
使用Kafka Connect和一个HTTP源连接器以及一堆SMT,我成功地创建了一个连接数据结构,当使用StringConverter写入主题时,它是这样的:
结构{base=stations,cod=200,coord=Struct{lat=54.0,lon=9.0},dt=163210605}
因此,JSON被成功解析为结构,我可以使用SMT操作单个元素。接下来,我在Confluent schema注册表中创建了一个具有相应模式的新主题,并将连接器的值转换器切换到Confluent AVRO转换器,使用“value.converter”:“io.Confluent.connect.AVRO.AvroConverter”
。
我收到的不是预期的序列化,而是一条错误消息:
org.apache.kafka.common.errors.SerializationException:错误序列化Avro消息引起的:org.apache.avro.SchemaParseException:无法重新定义:io.confluent.connect.avro.ConnectDefault
只要我用ReplaceField删除嵌套结构,或者用flatte简化结构,AVRO序列化就会像一个魔咒一样工作。因此,转换器似乎无法处理嵌套结构。
当您拥有嵌套元素并希望它们被序列化而不是将JSON存储为字符串并尝试在消费者或其他地方处理对象创建时,正确的方法是什么?这在Kafka Connect中可能吗?
从JSON字符串创建结构元素可以通过不同的方式实现。最初,使用SMT ExpandJson是为了简单。然而,它没有创建足够命名的结构,因为它没有一个模式可供使用。这就是AVRO序列化程序使用泛型类io时产生初始错误消息的原因。汇合的。连接阿夫罗。对于这些结构,ConnectDefault
,如果存在多个结构,则存在歧义,这会引发异常。
另一个似乎做同样事情的SMT是Json模式,它有一个记录在案的FromJson转换。它确实接受模式,因此绕过了ExpandJson将嵌套元素解析为泛型类型的问题。不过,现在接受的是JSON模式,而到AVRO全名的映射是通过将“properties”作为名称空间并复制字段名来实现的。在本例中,您将得到属性。coord
作为内部元素的全名。
例如,将以下JSON模式传递给SMT时:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"coord": {
"type": "object",
"properties": {
"lon": {
"type": "number"
},
"lat": {
"type": "number"
}
},
"required": [
"lon",
"lat"
]
},
...
}
它生成的AVRO模式(因此在模式注册表中查找)是:
{
"type": "record",
"fields": [
...
{
"name": "coord",
"type": {
"type": "record",
"name": "coord",
"namespace": "properties",
"fields": [
{
"name": "lat",
"type": "double"
},
{
"name": "lon",
"type": "double"
}
],
"connect.name": "properties.coord"
}
},
...
}
理论上,如果您在第二层有另一个带有coord
元素的架构,它将获得相同的全名,但由于这些不是架构注册表中需要引用的单个条目,因此不会导致冲突。无法从JSON Schema控制AVRO记录的命名空间有点遗憾,因为感觉就快到了,但我还没有能够深入挖掘以提供解决方案。
建议的SMT SetSchemaMetadata(参见问题的第一个答案)在这个过程中可能很有用,但它的文档与AVRO命名约定有点冲突,因为它在一个示例中显示了顺序值
。它将试图找到一个架构,其中包含一个以该名称作为根元素的AVRO记录,并且由于“-”在AVRO名称中是非法字符,因此会出现一个错误。但是,如果使用了根元素的正确名称,SMT会做一些非常有用的事情:它的RestService
类查询模式注册表以找到匹配的模式,失败时会显示一条消息,打印出需要创建的确切模式定义,因此不必记住所有转换规则。
因此,原始问题的答案是:是的,可以通过Kafka连接实现。如果你愿意,这也是最好的方式
如果在数据摄取后进行转换是一种选择,那么ksqlDB的de、re和序列化功能似乎相当强大。
问题内容: 我有一个Vendor对象,可以从一个单独的“ vendor” json序列中反序列化,但是我想将此序列反序列化为一个,我只是想不出如何让Jackson合作。有小费吗? 问题答案: 您的数据存在问题,因为您的数组中有内部 包装 对象。想必你的对象被设计成手柄,,,但每次的多个对象也都包裹在一个对象与单一属性。 我假设您正在使用Jackson 数据绑定 模型。 如果是这样,那么有两件事要考
我想创建一个函数来返回成功操作的一系列期货的结果。我遇到的问题是返回类型为Unit,并且未来函数正在完成,而无需等待嵌套的未来序列完成。我尝试过不使用on完成函数,而是使用map或平面图,但没有成功。我还想避免使用wait 这个后来会这么叫
我有一个使用Spring Cloud Stream和Spring Kafka的应用程序,它处理Avro消息。该应用程序运行良好,但现在我想添加一些错误处理。 目标是:捕获反序列化异常,使用异常详细信息原始Kafka消息自定义上下文信息构建新对象,并将此对象推送到专用Kafka主题。基本上是DLQ,但原始消息将被截获并修饰。 问题是:虽然我可以拦截异常,但我不知道如何从Kafka那里获取原始消息(下
考虑: 如果我们序列化Foo(),输出是: 我想要: 最干净的方法是什么?
我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。 我的代码在Apache Storm上运行。首先,我创建一个读卡器: 然后,我尝试反序列化消息,但不声明架构: 但当消息到达时,我会收到一个错误: 我看到的所有答案都是关于
我正在努力学习颤振,但我在JSON序列化上被卡住了。我在YouTube和Flitter文档中学习了一些教程,但我在序列化方面遇到了一些困难。你能帮我一点忙吗,这是为了教育目的,所以我更感兴趣的是背后的理论,而不是解决方案本身,然而,即使只有解决方案,我认为我可以尝试理解这个过程。我应该提到,我知道这一点,但在我的例子中,数据中有嵌套对象,这让我感到困惑。 作为响应数据的样本,它基本上是一个商店,每