我使用Kafka和Kafka Connect将MS SQL Server数据库复制到MySQL,使用debezium SQL Server CDC源连接器和汇合的JDBC汇连接器。“auto.create”设置为true,接收连接器确实创建了表,但某些数据类型不匹配。在SQL Server中,我有
CREATE TABLE employees (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
start_date DATE,
salary INT,
secret FLOAT,
create_time TIME
);
但在 MySQL 中,它创建了以下内容:
mysql> desc employees;
+-------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+---------+-------+
| id | int | NO | PRI | NULL | |
| first_name | text | NO | | NULL | |
| last_name | text | NO | | NULL | |
| email | text | NO | | NULL | |
| start_date | int | YES | | NULL | |
| salary | int | YES | | NULL | |
| secret | double | YES | | NULL | |
| create_time | bigint | YES | | NULL | |
| messageTS | datetime(3) | YES | | NULL | |
+-------------+-------------+------+-----+---------+-------+
忽略消息,这是我在 SMT 中添加的额外字段。
名字、姓氏、电子邮件、开始日期和创建时间的数据类型都不匹配。它将VARCHAR(255)转换为文本,将DATE转换为int,将TIME转换为bigint。
只是想知道是否有什么配置错误?
我正在使用docker运行SQLServer 2019和MySQL 9.0.28。
我还尝试了禁用自动创建的建议,并使用适当的数据类型自动进化和预先创建表。
mysql> desc employees;
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| first_name | varchar(255) | NO | | NULL | |
| last_name | varchar(255) | NO | | NULL | |
| email | varchar(255) | NO | | NULL | |
| start_date | date | NO | | NULL | |
| salary | int | NO | | NULL | |
| secret | double | NO | | NULL | |
| create_time | datetime | NO | | NULL | |
| messageTS | datetime | NO | | NULL | |
+-------------+--------------+------+-----+---------+----------------+
但是当尝试插入数据库时,它会给出以下异常:
kafka-connect | [2022-03-04 19:55:07,331] INFO Setting metadata for table "employees" to Table{name='"employees"', type=TABLE columns=[Column{'first_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'secret', isPrimaryKey=false, allowsNull=false, sqlType=DOUBLE}, Column{'salary', isPrimaryKey=false, allowsNull=false, sqlType=INT}, Column{'start_date', isPrimaryKey=false, allowsNull=false, sqlType=DATE}, Column{'email', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=INT}, Column{'last_name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'messageTS', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}, Column{'create_time', isPrimaryKey=false, allowsNull=false, sqlType=DATETIME}]} (io.confluent.connect.jdbc.util.TableDefinitions)
kafka-connect | [2022-03-04 19:55:07,382] WARN Write of 4 records failed, remainingRetries=0 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
kafka-connect | java.sql.BatchUpdateException: Data truncation: Incorrect date value: '19055' for column 'start_date' at row 1
该消息的价值是
{"id":1002,"first_name":"George","last_name":"Bailey","email":"george.bailey@acme.com","start_date":{"int":19055},"salary":{"int":100000},"secret":{"double":0.867153569942739},"create_time":{"long":1646421476477}}
start_date字段的消息模式为
{
"name": "start_date",
"type": [
"null",
{
"type": "int",
"connect.version": 1,
"connect.name": "io.debezium.time.Date"
}
],
"default": null
}
看起来它不知道如何转换io.debezium.time。Date转换为Date,并将其视为int。
对此的任何指示都非常感谢。
源配置:
{
"name": "SimpleSQLServerCDC",
"config":{
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":1,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"confluent.topic.bootstrap.servers":"kafka:29092",
"database.hostname" : "sqlserver",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "",
"database.dbname" : "testDB",
"database.server.name" : "corporation",
"database.history.kafka.topic": "dbhistory.corporation",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "delete"
}
}
接收器配置:
{
"name": "SimpleMySQLJDBC",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://mysql:3306/sinkdb",
"connection.user": "user",
"connection.password": "",
"tasks.max": "2",
"topics.regex": "corporation.dbo.*",
"auto.create": "true",
"auto.evolve": "true",
"dialect.name": "MySqlDatabaseDialect",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields":"id",
"delete.enabled": "true",
"batch.size": 1,
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms":"unwrap,dropPrefix,insertTS",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"corporation.dbo.(.*)",
"transforms.dropPrefix.replacement":"$1",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"drop",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq-mysql",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor":"1"
}
}
我刚刚做了一个SMT,它将所有时间戳字段转换为字符串。希望它能有所帮助。
https://github.com/FX-HAO/kafka-connect-debezium-tranforms
您需要进行2次更改
在源连接器中添加"time.precision.mode":"连接"
在接收器连接器中添加
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "dob",
将VARCHAR(255)转换为文本
字段的字符限制不通过Connect API数据类型传递。任何类似字符串的数据都将成为TEXT
列类型。
日期到int,时间到bigint
我认为,默认情况下,日期时间值会转换为 Unix 纪元。您可以使用时间戳转换器
转换转换为其他格式
总的来说,如果您想要准确地保留类型,请禁用从接收器连接器自动创建表,并使用您想要的类型预先创建表。
我知道Kafka JDBC接收器连接器对于数组数据类型有一些缺点。然而,是否有可能将接收器连接器与一个简单的Kafka连接器结合起来,该连接器可以支持数组数据类型。如何从Kafka配置中筛选并切换到简单的Kafka连接器配置简单的Kafka配置意味着什么?Kafka如何连接支持阵列字段 这是否可能,因为它将作为字符串而不是数组消耗给db
问题内容: 我是Kotlin的新手,正在玩数据类型。我选择了一个类型,然后尝试通过说来将其强制转换为a ,这在Java中是有效的(从语法上讲,但这是正确的)。但是,此操作失败,表示无法将Int强制转换为Double。我假设这是因为它是基于Integer类而不是原始的int数据类型构建的。我是正确的,最有价值的方法是什么?有一个功能,但这似乎效率低下且笨拙。 问题答案: 我花了一个类型,然后试图将它
我是科特林的新手,我在玩数据类型。我取了一个<code>Int</code>类型,然后尝试将其转换为<code>Double</code>,将<code>num表示为Double>/code,这是一个在java中有效的调用(非语法上,但你明白了)。然而,这失败了,表示Int不能强制转换为Double。我假设这是因为它是基于Integer类而不是原始int数据类型构建的。我说得对吗?什么是最有效的价
Linked lists 和Perl一样,OCaml也将对列表的支持直接内建在语言中了。OCaml中一个列表的所有元素的类型必须一致。使用以下格式来写列表: # [1; 2; 3];; - : int list = [1; 2; 3] (注意是分号,不是逗号)。 [] 表示空列表。 一个列表有一个“头”(第一个元素)和一个“尾”(剩下的元素)。头是一个元素,而尾则是一个列表,所以前面的例子中,表
是否可以只使用一行代码就将输入变量的数据类型(可以是任何基元类型,int、bool、float、double)作为字符串返回?我知道对于字符串类型,使用和可以很容易地做到这一点,但我不确定如何使用这些方法返回基元类型的类型。我还想保持我的代码非常简短,最好只用一行。 我到处找了找,找不到任何地方,这个问题已经以我要求的方式得到了回答。
在以下代码中,我得到了此错误: TypeError[ERR_INVALID_ARG_TYPE]:原始参数的类型必须是Function。接收类型未定义 它说问题在第31行: 我在使用promisify图书馆时遇到问题。