当前位置: 首页 > 知识库问答 >
问题:

如何在从debezium kafka connect接收的CDC事件中获取表名和数据库名

王修为
2023-03-14

设置:我在MSSQL服务器上启用了CDC,CDC事件使用debezium kafka连接(源)馈送到Kafka。此外,多个表CDC事件路由到Kafka中的单个主题。

问题:因为我在kafka主题中有不止一个表数据,所以我想在CDC数据中有表名和库名。

我在MySQL CDC中获得了表名和数据库名,但在MS SQL CDC中没有。

下面是SQL服务器的Debezium源连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "cdc-user_profile-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.hostname": "<<hostname>>",
    "database.port": "<<port>>",
    "database.user": "<<username>>",
    "database.password": "<<password>>",
    "database.server.name": "test",
    "database.dbname": "testDb",
    "table.whitelist": "schema01.table1,schema01.table2",
    "database.history.kafka.bootstrap.servers": "broker:9092",
    "database.history.kafka.topic": "digital.user_profile.schema.audit",
    "database.history.store.only.monitored.tables.ddl": true,
    "include.schema.changes": false,
    "event.deserialization.failure.handling.mode": "fail",
    "snapshot.mode": "initial_schema_only",
    "snapshot.locking.mode": "none",
    "transforms":"addStaticField,topicRoute",
    "transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addStaticField.static.field":"source_system",
    "transforms.addStaticField.static.value":"source_system_1",
    "transforms.topicRoute.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.topicRoute.regex":"(.*)",
    "transforms.topicRoute.replacement":"digital.user_profile",
    "errors.tolerance": "none",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "errors.retry.delay.max.ms": 60000,
    "errors.retry.timeout": 300000
  }
}'

我得到以下输出(演示数据)

{
  "before": {
    "profile_id": 147,
    "email_address": "test@gmail.com"
  },
  "after": {
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false
  },
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"
}

我的要求是得到如下

{
  "before": {
    "profile_id": 147,
    "email_address": "test@gmail.com"
  },
  "after": {
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false,
    "db": "testDb",
    "table": "table1/table2"
  },
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"
}

共有2个答案

屠盛
2023-03-14

Debezium Kafka-Connect通常将每个表中的数据放在一个单独的主题中,主题名称的格式hostname.database.table.我们通常使用主题名称来区分源表

如果您要手动将所有表中的数据放入一个主题中,那么您可能还需要手动添加表和库名。

西门振
2023-03-14

这是计划的一部分https://issues.jboss.org/browse/DBZ-875问题

 类似资料:
  • 问题内容: 我已经搜索了许多网站,但没有遇到任何从单个数据库获取表名的细节的代码或教程。 假设我有4个数据库,并且我想要数据库中所有表的名称,我可以使用什么查询? 问题答案:

  • 问题内容: 我对数据库及其在CRUD操作之外提供的功能不是很熟悉。 我的研究使我 引人注目 。基本上,触发器似乎提供了这种类型的功能: (来自维基百科) 通常,有三个触发事件导致触发器“触发”: INSERT事件(将新记录插入数据库中)。 UPDATE事件(因为记录正在更改)。 DELETE事件(因为一条记录正在被删除)。 我的问题是:当使用某种触发语义对记录进行更新/删除/插入时,数据库是否可以

  • 问题内容: 我正在尝试编写一个程序,该程序将通过NotesSQL驱动程序将整个Lotus Notes数据库转储到文件中。我通过jdbc:odbc连接并有 我可以执行选择并从Lotus Notes数据库获取数据 这是代码 是否有更好的方法通过NotesSQL连接到Lotus Notes数据库?因为使用我的代码,我只获得名称的值… 问题答案: 我知道您正在尝试使用JDBC和NotesSQL。但是,根据

  • 问题内容: 如标题中所指定,我想在sqlserver中获取数据库名称,我所知道的所有信息都是数据源名称,用于获取Connection对象的登录名/密码,请在Java中显示一些有关如何正确检索数据库名称的指针,谢谢! 甚至 问题答案: 从连接对象获取一个实例。 数据库名称可以通过或方法获得(取决于JDBC驱动程序的供应商)。 或使用或方法。 如果您有兴趣获得Oracle数据库服务器或Oracle数据

  • 问题内容: 如何获得特定表中的列名列表? IE。 火鸟表: 得到这样的列表: 问题答案: 如果要获取特定表中的列名列表,则这是您需要的sql查询: 我在firebird 2.5中尝试过此方法,并且可以正常工作。 顺便说一句,YOUR-TABLE-NAME周围的单引号是必需的

  • 我有两张桌子,彼此相连。我如何使用Spring数据jpa从数据库中获取它们? 代码如下, 结果:结果