当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect-JSON转换器-JDBC接收器连接器-列类型JSON

司空镜
2023-03-14

用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。

数据库是Postgres,它支持JSON的列类型。

根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理

是否可以将数据字段的类型设置为JSON,然后将其存储在Postgres DB中,列类型为JSON。

schema = `{
"type":"struct",
"fields":[
    {"type":"string", "optional": false, "field":"id"},
    {"type":"string", "optional": false, "field":"data"}
]}`

样本数据有效载荷是

"payload": { "id": 10000, "data": {"hello":"world"} }

上面将数据存储为文本,并期望列在Postgres中为文本类型。如果Postgres上的列是JSON类型,那么JDBCSink连接器将抛出错误。

在Postgres上使用JSON类型将有助于在JSON字段等上创建索引。是否可以适当地使用JSON转换器和JDBC Sink转换器来存储列类型为JSON的记录。

共有2个答案

锺离韬
2023-03-14

JDBC Sink连接器不支持PostgreSQL json、jsonb类型。它支持原始类型、日期时间的数量。

在以下页面中,您可以找到将模式类型映射到数据库类型(PostgreSQL)https://docs . confluent . io/5 . 1 . 0/connect/Kafka-connect-JDBC/sink-connector/index . html

虽然 JDBC 源连接器在某些部分支持 json、jsonb 类型 - 这种类型的列不会映射到 STRUCT,但会映射到 STRING 类型。

潘星阑
2023-03-14

使用value.converter.schema。enable=true,并像这样发送JSON数据(将模式作为每条消息的一部分,并用实际消息数据更新payload部分),它应该与JDBCSink一起工作。

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": false,
            "field": "id"
        }, {
            "type": "struct",
            "name": "data",
            "optional": false,
            "fields": [{
               "type": "string",
               "name": "hello",
               "optional":false
            }]
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "id": 10000,
        "data": {"hello":"world"}
    }
}

请注意,字段

或者你可以考虑转换你的客户使用Avro,为自己节省一些网络带宽。

 类似资料:
  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?

  • 使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息: 但是出现错误: Jsonlint说Json是有效的。我在 kafka 配置中保留了 json 。有什么指示吗?

  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • 我有一个kafka主题,它是使用Debezium mysql source connector从mysql数据库获取数据,下面是其中一条消息的格式: 我想使用jdbc接收器连接器将和(从对象/行)列推送到另一个数据库中,表模式为,对kafka来说,我不知道: > 我如何才能只提取这些字段,从消息中推送而忽略其他字段? 如何将before、after字段转换为字符串/序列化格式? 如何从对象提取?(

  • 我使用Kafka和Kafka Connect将MS SQL Server数据库复制到MySQL,使用debezium SQL Server CDC源连接器和汇合的JDBC汇连接器。“auto.create”设置为true,接收连接器确实创建了表,但某些数据类型不匹配。在SQL Server中,我有 但在 MySQL 中,它创建了以下内容: 忽略消息,这是我在 SMT 中添加的额外字段。 名字、姓氏