当前位置: 首页 > 知识库问答 >
问题:

具有 JSON 架构的 Kafka jdbc 接收器连接器不起作用

漆雕修德
2023-03-14

使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "msg"
            }
        ],
        "optional": false,
        "name": "msgschema"
    },
    "payload": {
        "id": 222,
        "msg": "hi"
    }
}

但是出现错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Jsonlint说Json是有效的。我在 kafka 配置中保留了 json schemas.enable=true。有什么指示吗?

共有3个答案

鲜于凯歌
2023-03-14

我最近遇到了同样的问题,我花了多次重试才弄清楚遗漏了什么:

以下设置对我有用:

key.converter.schemas.enable=false
value.converter.schemas.enable=true

此外,请确保该表以前存在于数据库中,并且连接器不应尝试创建表。设置auto.create=false

屠浩
2023-03-14

为了使用JDBC接收器,您的流消息必须有一个模式。这可以通过使用Avro和Schema Registry来实现,也可以通过使用JSON和Schema来实现。如果在最初运行源属性文件后配置了< code>schemas.enable=true,您可能需要删除主题,重新运行接收器,然后再次启动源端。

例:

接收器属性文件

name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true

以及示例工作配置文件< code > connect-avro-standalone . properties :

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

plugin.path=share/java

并执行

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties
欧阳俊晖
2023-03-14

您需要告诉 Connect 您的架构已嵌入到您正在使用的 JSON 中。

您有:

value.converter=org.apache.kafka.connect.json.JsonConverter 

但还需要:

value.converter.schemas.enable=true
 类似资料:
  • 这些是我的道路: 动态架构操作输出示例: 完整示例:https://github . com/Microsoft/PowerPlatformConnectors/blob/070009 ba 5681 FD 57 ee 6 cc 61d 2 a 380123712 a 088 f/certified-connectors/ticketing . events/API definition . sw

  • 问题内容: 好的,我已经尝试了Stack上的所有解决方案,但没有任何效果。我当前的方法从MainActivity注册了“ SmsListener”接收器。我要做的就是初始化onReceive方法。没有错误;它根本不是在广播。我究竟做错了什么?在此处粘贴适用的代码。可能需要的其他任何东西都可以问。 更新:这是一个未解决的类似问题,当 我在Android6.0.1下测试我正在测试的GoogleHang

  • 我尝试使用以下配置启动JDBC接收器连接器: 但当连接器处于运行状态时,没有任务正在运行: 我多次面对这个问题,但我很困惑,因为它是随机发生的。我的问题与这个问题非常相似。如果有任何帮助,我将不胜感激! 更新。11/04/2019(不幸的是,现在我只有INFO级别日志) 最后,经过几次尝试,我通过更新现有连接器的配置crm_data-sink_db_hh启动了正在运行任务的连接器: 日志: 更新。

  • 用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我开始对kafka绝望了。对于一个私人项目,一家公司向我发送了一个kafka流。经过长时间的尝试,我终于设法连接到引导服务器并接收到第一条消息。但没有反序列化。目前数据格式如下:4868fa8 该公司以avro格式发送密钥和值,我也获得了几个模式URL。但是我不能正确地使用它们,这样我就能得到可读的数据。不管我怎么输入,它总是出错。只有当我在没有任何模式的情况下进行检索时,我才能得到如上所述的消息