当前位置: 首页 > 知识库问答 >
问题:

Kafka连接JDBC错误,尽管给出了模式和有效负载

龚同
2023-03-14

当我运行kafka JDBC连接器到PSQL时,我收到以下错误:

JSON converter with schemas . enable需要“schema”和“payload”字段,不能包含其他字段。如果试图反序列化普通JSON数据,请在转换器配置中设置schemas.enable=false。

但是,我的主题包含以下消息结构,并添加了一个架构,就像在线显示的那样:

Rowtime: 2022/02/04 12:45:48.520 Z, key:, value:{"架构":{"类型":"结构","字段":[{"类型":"int","字段":"ID","可选": false},{"类型":"日期","字段":"日期","可选": false},{"类型":"varchar","字段":"ICD","可选": false},{"类型":"int","字段":"CPT","可选": false},{"类型":"双","字段":"成本","可选": false}],"可选": false,"名称":"test"},"有效载荷":{"ID":"24427934","日期":"2019-05-22","ICD":"883.436","CPT":"60502","成本":"1374.36"}}",分区: 0

我对连接器的配置是:

 curl -X PUT http://localhost:8083/connectors/claim_test/config \
    -H "Content-Type: application/json" \
    -d '{
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "connection.url":"jdbc:postgresql://localhost:5432/ae2772",
     "key.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable":"true",
     "topics":"test_7",
     "auto.create":"true",
     "insert.mode":"insert"
    }'

经过一些更改后,我现在得到以下消息:

WorkerSinkTask{id=claim_test}在偏移量0和时间戳1644005137197处转换主题“test_9”分区0中的消息值时出错:未知架构类型:int

共有1个答案

子车成和
2023-03-14

int 不是有效的架构类型。应为 int8int16int32int64

同样,datevarcharDouble也无效。

JSON中使用的类型不同于Postgres或任何特定于SQL的类型(日期应该转换为Unix Epoch int64时间或成为< code>string)。

您可以在这里找到支持的模式类型:https://github . com/Apache/Kafka/blob/trunk/connect/API/src/main/Java/org/Apache/Kafka/connect/data/schema . Java

 类似资料:
  • 我正在使用 Confluent JDBC-Source 连接器运行以下作业: 我有一个类似的Kafka-Connect作业在同一个数据库和同一个用户上成功运行,但使用另一个较小的表。所以连接不是问题。 在运行作业的 Kafka-connect 服务器上的日志中,我看到以下内容: 所以,没什么可说的。运行此作业的服务器现在无响应,并且不响应 REST 调用。有什么想法吗?

  • 我正在使用合流kafka connect jdbc源将mysql表中的记录推送到我的kafka主题,但似乎日期列被转换为纪元时间。 这是我的配置: kafka主题中的输出: 我也在类似于“select from_unixtime(updated _ on)from temp”的查询中尝试了from _ unixtime(),但是那不行。 有没有办法推到YYYY-MM-DD HH:MM:SS格式的K

  • 我有一个表达式组件,它创建一个字符串数组类型的对象,用于处理对 salesforce 的删除请求。但是,每次超出表达式时,流都会失败。 我在控制台上看到的错误是: 找不到转换器将"SimpleDataType{type=[Ljava.lang.String;, mimeType='/'}" 转换为"Grou 请告知是否有其他方法来进行此操作。

  • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?

  • 我正在尝试将MySQL与Kafka Connect连接,并且出现了许多错误。我正在共享我的connect-standalone.properties和mysql-jdbc-connector.properties,并显示错误。我的 Kafka 和 MySQL 在不同的集群中,我使用的是融合连接器,但不是在融合接口中。我下载了4.1.0 JDBC MySQL融合连接器。 MySQL-JDBC-con

  • 消息:通信链路失败上次成功发送到服务器的数据包是0毫秒前。驱动程序没有从服务器接收到任何数据包。SQLState:08S01错误代码:0 我不明白为什么!:(