当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect-JDBC Avro Connect如何在注册表中定义自定义模式

王飞虎
2023-03-14

我正在学习kafka connect的教程,我想知道是否有可能为数据来自MySql表的主题定义一个自定义的模式注册表。

我在我json/connect配置中找不到定义它的地方,而且我不想在创建模式后创建一个新版本。

我的MySql表称为站有这个模式

Field          | Type        
---------------+-------------
code           | varchar(4)  
date_measuring | timestamp   
attributes     | varchar(256)

其中,属性包含 Json 数据而不是字符串(我必须使用该类型,因为属性的 Json 字段是可变的。

我的连接器是

{
  "value.converter.schema.registry.url": "http://localhost:8081",
  "_comment": "The Kafka topic will be made up of this prefix, plus the table name  ",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "name": "jdbc_source_mysql_stations",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "transforms": [
    "ValueToKey"
  ],
  "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.ValueToKey.fields": [
    "code",
    "date_measuring"
  ],
  "connection.url": "jdbc:mysql://localhost:3306/db_name?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC",
  "connection.user": "confluent",
  "connection.password": "**************",
  "table.whitelist": [
    "stations"
  ],
  "mode": "timestamp",
  "timestamp.column.name": [
    "date_measuring"
  ],
  "validate.non.null": "false",
  "topic.prefix": "mysql-"
}

并创建该模式

{
  "subject": "mysql-stations-value",
  "version": 1,
  "id": 23,
  "schema": "{\"type\":\"record\",\"name\":\"stations\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"date_measuring\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"attributes\",\"type\":\"string\"}],\"connect.name\":\"stations\"}"
}

其中“属性”字段当然是一个字符串。与我将应用它的其他模式不同。

    {
  "fields": [
    {
      "name": "code",
      "type": "string"
    },
    {
      "name": "date_measuring",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "connect.version": 1,
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    },
    {
      "name": "attributes",
      "type": {
        "type": "record",
        "name": "AttributesRecord",
        "fields": [
          {
            "name": "H1",
            "type": "long",
            "default": 0
          },
          {
            "name": "H2",
            "type": "long",
            "default": 0
          },
          {
            "name": "H3",
            "type": "long",
            "default": 0
          },          
          {
            "name": "H",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Q",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P1",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P2",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P3",
            "type": "long",
            "default": 0
          },                    
          {
            "name": "P",
            "type": "long",
            "default": 0
          },          
          {
            "name": "T",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Hr",
            "type": "long",
            "default": 0
          },          
          {
            "name": "pH",
            "type": "long",
            "default": 0
          },          
          {
            "name": "RX",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Ta",
            "type": "long",
            "default": 0
          },  
          {
            "name": "C",
            "type": "long",
            "default": 0
          },                  
          {
            "name": "OD",
            "type": "long",
            "default": 0
          },          
          {
            "name": "TU",
            "type": "long",
            "default": 0
          },          
          {
            "name": "MO",
            "type": "long",
            "default": 0
          },          
          {
            "name": "AM",
            "type": "long",
            "default": 0
          },          
          {
            "name": "N03",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P04",
            "type": "long",
            "default": 0
          },          
          {
            "name": "SS",
            "type": "long",
            "default": 0
          },          
          {
            "name": "PT",
            "type": "long",
            "default": 0
          }          
        ]
       }
     }    
  ],
  "name": "stations",
  "namespace": "com.mycorp.mynamespace",
  "type": "record"
}

有什么建议吗?如果不可能,我想我必须创建一个KafkaStream来创建另一个主题,即使我会避免它。

提前感谢!

共有1个答案

柴华灿
2023-03-14

我不认为您问的是关于使用“自定义”注册中心的任何事情(您会用表示您正在使用哪个注册中心的两行代码来问),而是在从数据库中提取记录后,您如何解析数据/应用模式

您可以编写自己的Transform,也可以使用Kstream,这实际上是这里的主要选项。有一个SetSchemaMetadata转换,但我不确定它是否能满足您的要求(将字符串解析为Avro记录)

或者,如果您必须将JSON数据推入单个数据库属性,也许您不应该使用Mysql,而应该使用具有更灵活html" target="_blank">数据约束的文档数据库。

否则,您可以使用 BLOB 而不是 varchar 并将二进制 Avro 数据放入该列中,但这样您仍然需要一个自定义的反序列化程序来读取数据

 类似资料:
  • 问题内容: 我想在定义类时注册一个类的实例。理想情况下,以下代码可以解决问题。 不幸的是,此代码生成错误。 发生的事情是在我尝试实例化a的行上,但装饰器尚未返回,因此它不存在。 是否使用元类解决了这个问题? 问题答案: 是的,元类可以做到这一点。元类的方法返回该类,因此只需在返回之前注册该类。 前面的示例在Python 2.x中有效。在Python 3.x中,的定义略有不同(虽然未更改,但未显示-

  • 我是新来的Laravel。我有两种不同的用户类型:老师和学生。教师在网站注册表格中注册账户,但学生注册由windows应用程序提供。所以我必须为这个应用程序提供一个API。默认Laravel认证是足够的教师注册,但我可以使用相同的AuthController API注册?我不需要视图或重定向到网站。

  • 我已经为log4j2编写了一个自定义触发策略,该策略将被回滚。在每小时/天/你的工作时间间隔结束时,按照本SO帖子的建议记录文件。 虽然我遵循基于时间的触发策略约定(命名等),但我无法看到我的策略被实例化和使用。 解决方案由3个java文件和一个maven文件组成,可在github上获得 在这里,您可以从政策本身找到主要内容: log4j2。xml文件: 编辑: 在调试过程中,我了解到在log4j

  • 我创建一个自定义注册表单在wordpress买我有以下错误,我似乎找不到什么导致他们。 注意:未定义的变量:new_user在第219行的 /Applications/MAMP/htdocs/****/wp-内容/插件/护理匹配/carematch.php 注意:未定义的变量:错误 /Applications/MAMP/htdocs/****/wp-内容/插件/护理匹配/carematch.php

  • 我想注册一个Avro模式,它在模式注册表上引用另一个Avro模式。 首先,我注册了以下基本Avro模式: 如果我尝试注册以下Avro架构,该架构引用“客户端”属性中的基本架构,则操作失败并出现错误422 这个问题似乎与指定自定义类型字段有关。 任何想法如何添加自定义类型,同时注册相关的模式在模式注册表?