当前位置: 首页 > 知识库问答 >
问题:

Kafka jdbc接收器连接器创建的数据类型与原始数据类型不匹配

越星晖
2023-03-14

我使用Kafka和Kafka Connect将MS SQL Server数据库复制到MySQL,使用debezium SQL Server CDC源连接器和汇合的JDBC汇连接器。“auto.create”设置为true,接收连接器确实创建了表,但某些数据类型不匹配。在SQL Server中,我有

CREATE TABLE employees (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE,
  start_date DATE,
  salary INT,
  secret FLOAT,
  create_time TIME
);

但在 MySQL 中,它创建了以下内容:

mysql> desc employees;
+-------------+-------------+------+-----+---------+-------+
| Field       | Type        | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| id          | int         | NO   | PRI | NULL    |       |
| first_name  | text        | NO   |     | NULL    |       |
| last_name   | text        | NO   |     | NULL    |       |
| email       | text        | NO   |     | NULL    |       |
| start_date  | int         | YES  |     | NULL    |       |
| salary      | int         | YES  |     | NULL    |       |
| secret      | double      | YES  |     | NULL    |       |
| create_time | bigint      | YES  |     | NULL    |       |
| messageTS   | datetime(3) | YES  |     | NULL    |       |
+-------------+-------------+------+-----+---------+-------+

忽略消息,这是我在 SMT 中添加的额外字段。

名字、姓氏、电子邮件、开始日期和创建时间的数据类型都不匹配。它将VARCHAR(255)转换为文本,将DATE转换为int,将TIME转换为bigint。

只是想知道是否有什么配置错误?

我正在使用docker运行SQLServer 2019和MySQL 9.0.28。

我还尝试了禁用自动创建的建议,并使用适当的数据类型自动进化和预先创建表。

mysql> desc employees;
+-------------+--------------+------+-----+---------+----------------+
| Field       | Type         | Null | Key | Default | Extra          |
+-------------+--------------+------+-----+---------+----------------+
| id          | int          | NO   | PRI | NULL    | auto_increment |
| first_name  | varchar(255) | NO   |     | NULL    |                |
| last_name   | varchar(255) | NO   |     | NULL    |                |
| email       | varchar(255) | NO   |     | NULL    |                |
| start_date  | date         | NO   |     | NULL    |                |
| salary      | int          | NO   |     | NULL    |                |
| secret      | double       | NO   |     | NULL    |                |
| create_time | datetime     | NO   |     | NULL    |                |
| messageTS   | datetime     | NO   |     | NULL    |                |
+-------------+--------------+------+-----+---------+----------------+

但是当尝试插入数据库时,它会给出以下异常:

kafka-connect  | [2022-03-04 19:55:07,331] INFO Setting metadata for table "employees" to Table{name='"employees"', type=TABLE columns=[Column{'first_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'secret', isPrimaryKey=false, allowsNull=false, sqlType=DOUBLE}, Column{'salary', isPrimaryKey=false, allowsNull=false, sqlType=INT}, Column{'start_date', isPrimaryKey=false, allowsNull=false, sqlType=DATE}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'last_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'messageTS', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}, Column{'create_time', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}]} (io.confluent.connect.jdbc.util.TableDefinitions)
kafka-connect  | [2022-03-04 19:55:07,382] WARN Write of 4 records failed, remainingRetries=0 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
kafka-connect  | java.sql.BatchUpdateException: Data truncation: Incorrect date value: '19055' for column 'start_date' at row 1

该消息的价值是

{"id":1002,"first_name":"George","last_name":"Bailey","email":"george.bailey@acme.com","start_date":{"int":19055},"salary":{"int":100000},"secret":{"double":0.867153569942739},"create_time":{"long":1646421476477}}

start_date字段的消息模式为

    {
      "name": "start_date",
      "type": [
        "null",
        {
          "type": "int",
          "connect.version": 1,
          "connect.name": "io.debezium.time.Date"
        }
      ],
      "default": null
    }

看起来它不知道如何转换io.debezium.time。Date转换为Date,并将其视为int。

对此的任何指示都非常感谢。

源配置:

{
    "name": "SimpleSQLServerCDC",
    "config":{
      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
      "tasks.max":1,
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "confluent.topic.bootstrap.servers":"kafka:29092",
      "database.hostname" : "sqlserver",
      "database.port" : "1433",
      "database.user" : "sa",
      "database.password" : "",
      "database.dbname" : "testDB",
      "database.server.name" : "corporation",

      "database.history.kafka.topic": "dbhistory.corporation",
      "database.history.kafka.bootstrap.servers" : "kafka:29092",

      "topic.creation.default.replication.factor": 1,
      "topic.creation.default.partitions": 10,
      "topic.creation.default.cleanup.policy": "delete"
    }
  }

接收器配置:

{
  "name": "SimpleMySQLJDBC",
  "config": {
          "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url": "jdbc:mysql://mysql:3306/sinkdb",
          "connection.user": "user",
          "connection.password": "",
          "tasks.max": "2",
          "topics.regex": "corporation.dbo.*",
          "auto.create": "true",
          "auto.evolve": "true",
          "dialect.name": "MySqlDatabaseDialect",
          "insert.mode": "upsert",
          "pk.mode": "record_key",
          "pk.fields":"id",
          "delete.enabled": "true",
          "batch.size": 1,
          "key.converter":"io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schema-registry:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://schema-registry:8081",

          "transforms":"unwrap,dropPrefix,insertTS",

          "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.dropPrefix.regex":"corporation.dbo.(.*)",
          "transforms.dropPrefix.replacement":"$1",

          "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
          "transforms.unwrap.drop.tombstones":"false",
          "transforms.unwrap.delete.handling.mode":"drop",

          "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTS.timestamp.field": "messageTS",

          "errors.log.enable": "true",
          "errors.log.include.messages": "true",
          "errors.tolerance":"all",
          "errors.deadletterqueue.topic.name":"dlq-mysql",
          "errors.deadletterqueue.context.headers.enable": "true",
          "errors.deadletterqueue.topic.replication.factor":"1"
      }
}

共有3个答案

江育
2023-03-14

我刚刚做了一个SMT,它将所有时间戳字段转换为字符串。希望它能有所帮助。

https://github.com/FX-HAO/kafka-connect-debezium-tranforms

陆承宣
2023-03-14

您需要进行2次更改
在源连接器中添加"time.precision.mode":"连接"
在接收器连接器中添加

"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "dob",
孙玺
2023-03-14

将VARCHAR(255)转换为文本

字段的字符限制不通过Connect API数据类型传递。任何类似字符串的数据都将成为TEXT列类型。

日期到int,时间到bigint

我认为,默认情况下,日期时间值会转换为 Unix 纪元。您可以使用时间戳转换器转换转换为其他格式

总的来说,如果您想要准确地保留类型,请禁用从接收器连接器自动创建表,并使用您想要的类型预先创建表。

 类似资料:
  • 我知道Kafka JDBC接收器连接器对于数组数据类型有一些缺点。然而,是否有可能将接收器连接器与一个简单的Kafka连接器结合起来,该连接器可以支持数组数据类型。如何从Kafka配置中筛选并切换到简单的Kafka连接器配置简单的Kafka配置意味着什么?Kafka如何连接支持阵列字段 这是否可能,因为它将作为字符串而不是数组消耗给db

  • 问题内容: 我是Kotlin的新手,正在玩数据类型。我选择了一个类型,然后尝试通过说来将其强制转换为a ,这在Java中是有效的(从语法上讲,但这是正确的)。但是,此操作失败,表示无法将Int强制转换为Double。我假设这是因为它是基于Integer类而不是原始的int数据类型构建的。我是正确的,最有价值的方法是什么?有一个功能,但这似乎效率低下且笨拙。 问题答案: 我花了一个类型,然后试图将它

  • 我是科特林的新手,我在玩数据类型。我取了一个<code>Int</code>类型,然后尝试将其转换为<code>Double</code>,将<code>num表示为Double>/code,这是一个在java中有效的调用(非语法上,但你明白了)。然而,这失败了,表示Int不能强制转换为Double。我假设这是因为它是基于Integer类而不是原始int数据类型构建的。我说得对吗?什么是最有效的价

  • Linked lists 和Perl一样,OCaml也将对列表的支持直接内建在语言中了。OCaml中一个列表的所有元素的类型必须一致。使用以下格式来写列表: # [1; 2; 3];; - : int list = [1; 2; 3] (注意是分号,不是逗号)。 [] 表示空列表。 一个列表有一个“头”(第一个元素)和一个“尾”(剩下的元素)。头是一个元素,而尾则是一个列表,所以前面的例子中,表

  • 是否可以只使用一行代码就将输入变量的数据类型(可以是任何基元类型,int、bool、float、double)作为字符串返回?我知道对于字符串类型,使用和可以很容易地做到这一点,但我不确定如何使用这些方法返回基元类型的类型。我还想保持我的代码非常简短,最好只用一行。 我到处找了找,找不到任何地方,这个问题已经以我要求的方式得到了回答。

  • 在以下代码中,我得到了此错误: TypeError[ERR_INVALID_ARG_TYPE]:原始参数的类型必须是Function。接收类型未定义 它说问题在第31行: 我在使用promisify图书馆时遇到问题。