支持以下功能:
模式 | 描述 |
---|---|
none | 创建表时没有指定主键字段。 |
kafka | 主键由以下三列组成:__connect_topic,__connect_partition,__connect_offset这些列的值来自 Kafka 事件的坐标。 |
record_key | 主键由 Kafka 事件的键组成。如果主键是原始类型,则通过设置属性指定要使用的列的名称primary.key.fields。如果主键是结构类型,则结构中的字段映射为主键的列。您可以使用该primary.key.fields属性将主键限制为列的子集。 |
record_value | 主键由 Kafka 事件的值组成。因为 Kafka 事件的值始终是 a Struct,默认情况下,值中的所有字段都成为主键的列。要使用主键中的字段子集,请设置该primary.key.fields属性以在要从中派生主键列的值中指定以逗号分隔的字段列表。 |
每种数据库方言以不同方式处理幂等写入,因为没有用于更新插入操作的 SQL 标准。下面说明了支持的数据库方言使用的 upsert 数据库特定的 DML 语法:
数据库 | 更新语法 |
---|---|
DB2 | MERGE … |
MySQL | INSERT … ON DUPLICATE KEY UPDATE … |
Oracle | MERGE … |
PostgreSQL | INSERT … ON CONFLICT … DO UPDATE SET … |
SQL Server | MERGE … |
Debezium JDBC 接收器连接器支持以下模式演变模式:
模式 | 描述 |
---|---|
none | 连接器不执行任何 DDL 架构演变。 |
basic | 连接器会自动检测事件负载中存在但目标表中不存在的字段。连接器更改目标表以添加新字段。 |
注意:
Debezium JDBC 接收器连接器通过使用逻辑或原始类型映射系统解析列的数据类型。原始类型包括整数、浮点数、布尔值、字符串和字节等值。通常,这些类型仅用特定的 Kafka Connect 类型代码表示Schema。逻辑数据类型通常是复杂类型,包括值,例如Struct具有一组固定字段名称和模式的基于 的类型,或使用特定编码表示的值,例如自纪元以来的天数。
以下示例显示了原始数据类型和逻辑数据类型的代表性结构:
原始字段模式
{
"schema": {
"type": "INT64"
}
}
逻辑字段模式
[
"schema": {
"type": "INT64",
"name": "org.apache.kafka.connect.data.Date"
}
]
Kafka Connect 不是这些复杂的逻辑类型的唯一来源。事实上,Debezium 源连接器生成的更改事件具有具有相似逻辑类型的字段,以表示各种不同的数据类型,包括但不限于时间戳、日期,甚至 JSON 数据。
Debezium JDBC 接收器连接器使用这些原始和逻辑类型将列的类型解析为表示列类型的 JDBC SQL 代码。然后,底层 Hibernate 持久性框架使用这些 JDBC SQL 代码将列的类型解析为所用方言的逻辑数据类型。下表说明了 Kafka Connect 和 JDBC SQL 类型之间以及 Debezium 和 JDBC SQL 类型之间的原始和逻辑映射。实际的最终列类型因每种数据库类型而异。
Kafka Connect原始类型和列数据类型之间的映射:
原始类型 | JDBC SQL类型 |
---|---|
INT8 | Types.TINYINT |
INT16 | Types.SMALLINT |
INT32 | Types.INTEGER |
INT64 | Types.BIGINT |
FLOAT32 | Types.FLOAT |
FLOAT64 | Types.DOUBLE |
BOOLEAN | Types.BOOLEAN |
STRING | Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES | Types.VARBINARY |
Kafka Connect 逻辑类型和列数据类型之间的映射:
逻辑类型 | JDBC SQL类型 |
---|---|
org.apache.kafka.connect.data.Decimal | Types.DECIMAL |
org.apache.kafka.connect.data.Date | Types.DATE |
org.apache.kafka.connect.data.Time | Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Types.TIMESTAMP |
Debezium 逻辑类型和列数据类型之间的映射:
逻辑类型 | JDBC SQL类型 |
---|---|
io.debezium.time.Date | Types.DATE |
io.debezium.time.Time | Types.TIMESTAMP |
io.debezium.time.MicroTime | Types.TIMESTAMP |
io.debezium.time.NanoTime | Types.TIMESTAMP |
io.debezium.time.ZonedTime | Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Types.TIMESTAMP |
io.debezium.time.MicroTimestamp | Types.TIMESTAMP |
io.debezium.time.NanoTimestamp | Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | Types.DOUBLE |
如果数据库不支持带时区的时间或时间戳,则映射解析为不带时区的等效映射。
Debezium特定的逻辑类型和列数据类型之间的映射
逻辑类型 | MySQL SQL 类型 | PostgreSQL SQL 类型 | SQL Server SQL 类型 |
---|---|---|---|
io.debezium.data.Bits | bit(n) | bit(n) or bit varying | varbinary(n) |
io.debezium.data.Enum | enum | Types.VARCHAR | n/a |
io.debezium.data.Json | json | json | n/a |
io.debezium.data.EnumSet | set | n/a | n/a |
io.debezium.time.Year | year(n) | n/a | n/a |
io.debezium.time.MicroDuration | n/a | interval | n/a |
io.debezium.data.Ltree | n/a | ltree | n/a |
io.debezium.data.Uuid | n/a | uuid | n/a |
io.debezium.data.Xml | n/a | xml | xml |
除了上面的原始和逻辑映射之外,如果更改事件的源是 Debezium 源连接器,则列类型的分辨率及其长度、精度和比例可能会受到启用列或数据类型的进一步影响传播。要强制传播,必须在源连接器配置中设置以下属性之一:
Debezium JDBC 接收器连接器应用具有更高优先级的值。
例如,假设更改事件中包含以下字段架构:
启用列或数据类型传播的 Debezium 更改事件字段模式
{
"schema": {
"type": "INT8",
"parameters": {
"__debezium.source.column.type": "TINYINT",
"__debezium.source.column.length": "1"
}
}
}
在前面的示例中,如果未设置架构参数,则 Debezium JDBC 接收器连接器会将此字段映射到列类型Types.SMALLINT。Types.SMALLINT可以有不同的逻辑数据库类型,这取决于数据库。对于 MySQL,该示例将转换为TINYINT没有指定长度的列类型。如果为源连接器启用了列或数据类型传播,则 Debezium JDBC 接收器连接器使用映射信息来优化数据类型映射过程并创建类型为TINYINT(1)的列。
通常,当源数据库和接收器数据库使用相同类型的数据库时,使用列或数据类型传播的效果会大得多。我们一直在寻找改进这种跨异构数据库映射的方法,当前的类型系统允许我们根据反馈继续改进这些映射。
要部署 Debezium JDBC 连接器,您需要安装 Debezium JDBC 连接器存档、配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。
先决条件:
程序:
通常,通过提交指定连接器配置属性的 JSON 请求来注册 Debezium JDBC 连接器。以下示例显示了一个 JSON 请求,用于注册 Debezium JDBC 接收器连接器的实例,该连接器使用来自使用orders最常见配置设置调用的主题的事件:
Debezium JDBC连接器配置
{
"name": "jdbc-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost/db",
"connection.username": "pguser",
"connection.password": "pgpassword",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"database.time_zone": "UTC"
}
}
可以使用命令将此配置发送POST到正在运行的 Kafka Connect 服务。该服务记录配置并启动执行以下操作的接收器连接器任务:
Debezium JDBC 接收器连接器有几个配置属性,可以使用它们来实现满足您需要的连接器行为。许多属性都有默认值。有关属性的信息组织如下:
连接属性:
属性 | 默认值 | 描述 |
---|---|---|
connection.url | 无默认 | 用于连接到数据库的 JDBC 连接 URL。 |
connection.username | 无默认 | 连接器用于连接到数据库的数据库用户帐户的名称。 |
connection.password | 无默认 | 连接器用于连接到数据库的密码。 |
connection.pool.min_size | 5 | 指定池中的最小连接数。 |
connection.pool.min_size | 32 | 指定池维护的最大并发连接数。 |
connection.pool.acquire_increment | 32 | 指定在连接池超过其最大大小时连接器尝试获取的连接数。 |
connection.pool.timeout | 1800 | 指定未使用的连接在被丢弃之前保留的秒数。 |
运行时属性:
属性 | 默认值 | 描述 |
---|---|---|
database.time_zone | UTC | 指定插入 JDBC 时间值时使用的时区。 |
delete.enabled | false | 指定连接器是否处理DELETE逻辑删除事件并从数据库中删除相应的行。使用此选项需要您将 设置primary.key.mode为record.key。 |
insert.mode | insert | 指定用于将事件插入数据库的策略。以下选项可用:insert指定所有事件都应构造INSERT基于 SQL 语句。仅当不使用主键时,或者当您可以确定不会对具有现有主键值的行进行更新时,才使用此选项。update指定所有事件都应构造UPDATE基于 SQL 语句。仅当您可以确定连接器仅接收适用于现有行的事件时才使用此选项。upsert指定连接器使用upsert语义将事件添加到表中。即如果主键不存在,则连接器执行操作INSERT,如果键存在,则连接器执行操作UPDATE。当需要幂等写入时,应将连接器配置为使用此选项。 |
primary.key.mode | none | 指定连接器如何从事件中解析主键列。none指定不创建主键列。kafka指定连接器使用 Kafka 坐标作为主键列。关键坐标是根据事件的主题名称、分区和偏移量定义的,并映射到具有以下名称的列:__connect_topic、__connect_partition、__connect_offset。record_key指定主键列源自事件的记录键。如果记录键是原始类型,则primary.key.fields需要该属性指定主键列的名称。如果记录键是结构类型,primary.key.fields则该属性是可选的,可用于将事件键中的列子集指定为表的主键。record_value指定主键列源自事件的值。可以设置该primary.key.fields属性以将主键定义为事件值的字段子集;否则默认使用所有字段。 |
primary.key.fields | 无默认 | 主键列的名称或从中派生主键的以逗号分隔的字段列表。当primary.key.mode设置为record_key并且事件的键是原始类型时,预计此属性指定要用于键的列名。当使用非原始键primary.key.mode设置为或时,预计此属性指定键或值中以逗号分隔的字段名称列表。如果使用非原始键设置为或 ,并且未指定此属性,则连接器根据指定的模式从记录键或记录值的所有字段派生主键。 |
quote.identifiers | false | 指定生成的 SQL 语句是否使用引号来分隔表名和列名。 |
schema.evolution | none | 指定连接器如何发展目标表架构。none指定连接器不发展目标模式。basic指定发生基本进化。连接器通过将传入事件的记录模式与数据库表结构进行比较,将缺失的列添加到表中。 |
table.name.format | ${topic} | 指定一个字符串,该字符串根据事件的主题名称确定目标表名称的格式。占位符${topic}替换为主题名称。 |
可扩展属性:
属性 | 默认值 | 描述 |
---|---|---|
column.naming.strategy | i.d.c.j.n.DefaultColumnNamingStrategy | 指定 ColumnNamingStrategy 实现的完全限定类名,连接器使用该实现从事件字段名中解析列名。默认情况下,连接器使用字段名称作为列名称。 |
table.naming.strategy | i.d.c.j.n.DefaultTableNamingStrategy | TableNamingStrategy指定连接器用于从传入事件主题名称解析表名称的实现的完全限定类名称。默认行为是:${topic}将配置属性中的占位符替换table.name.format为事件的主题。通过将点 (.) 替换为下划线 (_) 来清理表名。 |
1.是否需要ExtractNewRecordState单条消息转换?
2.如果更改了列的类型,或者重命名或删除了列,这是否由模式演化处理?
3.如果列的类型未解析为我想要的类型,我如何强制映射到不同的数据类型?
4.如何在不更改 Kafka 主题名称的情况下为表名指定前缀或后缀?
5.为什么有些列会自动引用,即使未启用标识符引用?
更多内容请参考博主Debezium专栏: