用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。
数据库是Postgres,它支持JSON的列类型。
根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理
是否可以将数据字段的类型设置为JSON,然后将其存储在Postgres DB中,列类型为JSON。
schema = `{
"type":"struct",
"fields":[
{"type":"string", "optional": false, "field":"id"},
{"type":"string", "optional": false, "field":"data"}
]}`
样本数据有效载荷是
"payload": { "id": 10000, "data": {"hello":"world"} }
上面将数据存储为文本,并期望列在Postgres中为文本类型。如果Postgres上的列是JSON类型,那么JDBCSink连接器将抛出错误。
在Postgres上使用JSON类型将有助于在JSON字段等上创建索引。是否可以适当地使用JSON转换器和JDBC Sink转换器来存储列类型为JSON的记录。
JDBC Sink连接器不支持PostgreSQL json、jsonb类型。它支持原始类型、日期时间的数量。
在以下页面中,您可以找到将模式类型映射到数据库类型(PostgreSQL)https://docs . confluent . io/5 . 1 . 0/connect/Kafka-connect-JDBC/sink-connector/index . html
虽然 JDBC 源连接器在某些部分支持 json、jsonb 类型 - 这种类型的列不会映射到 STRUCT,
但会映射到 STRING
类型。
使用value.converter.schema。enable=true
,并像这样发送JSON数据(将模式作为每条消息的一部分,并用实际消息数据更新payload
部分),它应该与JDBCSink一起工作。
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "struct",
"name": "data",
"optional": false,
"fields": [{
"type": "string",
"name": "hello",
"optional":false
}]
}],
"optional": false,
"name": "foobar"
},
"payload": {
"id": 10000,
"data": {"hello":"world"}
}
}
请注意,字段
或者你可以考虑转换你的客户使用Avro,为自己节省一些网络带宽。
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连
我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?
使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息: 但是出现错误: Jsonlint说Json是有效的。我在 kafka 配置中保留了 json 。有什么指示吗?
我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误
我有一个kafka主题,它是使用Debezium mysql source connector从mysql数据库获取数据,下面是其中一条消息的格式: 我想使用jdbc接收器连接器将和(从对象/行)列推送到另一个数据库中,表模式为,对kafka来说,我不知道: > 我如何才能只提取这些字段,从消息中推送而忽略其他字段? 如何将before、after字段转换为字符串/序列化格式? 如何从对象提取?(
我使用Kafka和Kafka Connect将MS SQL Server数据库复制到MySQL,使用debezium SQL Server CDC源连接器和汇合的JDBC汇连接器。“auto.create”设置为true,接收连接器确实创建了表,但某些数据类型不匹配。在SQL Server中,我有 但在 MySQL 中,它创建了以下内容: 忽略消息,这是我在 SMT 中添加的额外字段。 名字、姓氏