当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect和Key Converter的主题名称策略

宦砚
2023-03-14

我正在尝试设置Debezium MySQL源连接器。我的目标是为每个数据库提供一个主题,因此我正在研究利用主题的可能性,以使主题可以包含不同的消息类型并且它们的模式可以存储在ConFluent Schema注册表中。

根据这里的几个答案,我将键和值转换器主题名策略设置为io.confluent.kafka.serializers.subject.TopicRecordNameStrategy。

要将来自同一模式的所有消息重新路由到同一主题,我使用以下配置:

{
  "name": "aws-db-connector",
  "config": {
    "group.id": "aws-db-group",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "secret-pw",
    "database.server.id": "184054",
    "database.server.name": "aws-db",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.aws-db",
    "database.include.list": "db1,db2",
    "transforms": "unwrap,Reroute",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
    "transforms.Reroute.topic.replacement": "$2_schema",
    "transforms.Reroute.key.field.name": "table",
    "transforms.Reroute.key.field.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
    "transforms.Reroute.key.field.replacement": "$3"
  }
}

在我的docker-compose文件中,我设置了:

- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081

对于值,这是完美的工作。我可以看到我的模式注册表包含多个主题,格式为

不幸的是,对于键,这个策略没有按预期工作,我只有一个模式主题:它看起来像是包含新主题名而不是原始主题名。如果一个字段名在不同的表中具有不同的类型,则会导致冲突和不兼容错误。

生成密钥主题时,是否有任何方法可以提供适当的RecordName

编辑-添加示例:

假设我的数据库包含三个表,table1table2table3

表1:

CREATE TABLE `table1` (
    `id` INT NOT NULL AUTO_INCREMENT,
    `name` TEXT,
    PRIMARY KEY (`id`)
);

表2:

CREATE TABLE `table2` (
    `id` INT NOT NULL AUTO_INCREMENT,
    `name` BINARY,
    PRIMARY KEY (`id`)
);

表3:

CREATE TABLE `table3` (
    `id` BINARY NOT NULL,
    `name` INT,
    PRIMARY KEY (`id`)
);

使用上述配置运行Debezium,它会在模式注册表中创建以下值主题:

  • db1_schema.db1-aws-db。表1数值

以及以下关键主题:

  • db1\u schema.db1\u schema-Key

轮到table3时,Debezium连接器失败,因为id列以int类型注册在模式注册表主题中,并且它与table3中的bytes类型不兼容。因此我得到了这个错误:

正在注册的模式与早期模式不兼容;错误代码:409

我所期望的是,也会为密钥创建单独的主题:

  • db1_模式。aws-db-db1。表1键
  • db1_模式。aws-db-db1。表2键
  • db1_schema.aws-db-db1.table3-Key

这样,具有不同键模式的消息可以存储在同一个主题中。


共有1个答案

罗翔
2023-03-14

这似乎是默认情况下Debezium的工作方式。它只会为每个主题创建一个键模式,但会创建不同的值模式,因此重新路由到该主题的所有消息都应该共享相同的键结构。

为了解决这个问题,应该使用RegexRouter。在重新路由之前应用InsertField转换也会将原始主题名添加到键中,并且可以从中提取表名。

"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.replacement": "$2_schema",
 类似资料:
  • 下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?

  • 问题内容: 我正在使用version 。在此工具中,我发布了示例项目(即),并订阅了该项目以获取使用者密钥和机密。该工具还为我提供了可以正常运行的CURL命令。 下面的CURL命令运行正常。 现在我正在努力开发的利用代码将连接到无,即,方式看一下上面的curl命令。到目前为止,我已经开发了以下代码,但是运行时,我看到以下错误。 我面临的错误。 请建议我们如何以不安全的方式连接到SSL站点,就像cu

  • 我正在使用谷歌应用程序脚本更改几个谷歌课堂中的几个主题的名称。我使用只更改主题的'name'值,但当我查看教室时,我的脚本没有更改任何内容。 这里有一个例子:我有两个教室(课程ID'100000000000'和'100000000001')。在每个教室里,我有三个主题(主题名为'Topic1'、'Topic2'和'Topic3')。我想把前两个主题的名称分别改为'NewTopic1'和'NewTo

  • 我在数字海洋上有一个开发服务器,使用的是Ubuntu 14.04。我把我的外壳换成了ZSH,并决定采用不可知论者的主题。为了让< code>user@hostname停止显示,我在我的。zshrc文件。 由于某种原因,在 ubuntu 服务器上,这不起作用。主机名仍然显示,并且不会消失。我正在Mac OSX上做同样的事情,它工作正常。 以下是一些截图: 有人知道发生了什么吗?我甚至尝试了< cod

  • 我们在Spring Boot应用程序中使用Kafka Cloud Stream向Kafka发送数据。这样地 我想知道除了直接从 yaml 文件中读取之外,是否可以从消息通道获取主题名称? 主题名称存在于kafka.yaml中

  • 我想知道Android Studio默认编辑器主题的名字。我知道黑暗的那个叫Darcula,但是我找不到android工作室默认主题的名字。它只是说Default或IntelliJ在那里,但我想知道具体的颜色主题的名称,或任何其他颜色主题几乎相同的默认的一个 我刚开始使用atom text editor完成其他编程任务,但我喜欢android Studio的默认主题。所以我想知道缺省主题的名称,以