当前位置: 首页 > 知识库问答 >
问题:

为数据库中的多个表配置debezium连接器

许茂才
2023-03-14

我正在尝试为MySQL数据库中的多个表配置Debezium连接器(我在MySQL 8.0上使用Debezium 1.4)。在Kafka中创建主题时,我的公司有一个命名模式要遵循,而这个模式不允许使用下划线(u),所以我不得不用连字符(-)替换它们

所以,我的主题名称是:

专题1

fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status". 
- All changes in that table, must go to that topic.

专题二

fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.

专题三

fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.

我正在尝试使用转换“ByLogicalTableRouter”,但我找不到解决我的问题的正则表达式解决方案。

{ "name": "debezium.connector",
 "config":
    { 
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium", 
"database.password": "password", 
"database.server.id": "1000", 
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2" 
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2" 
    }
}
  • 在第一个转换中,我试图删除主题路由中重复的模式名称。
  • 在第二个转换中,替换所有的残余物下划线_hiphens-

但有了这些,我得到了下面的错误,这表明它试图将所有内容发送到同一个主题

原因:org。阿帕奇。Kafka。连接错误。SchemaBuilderException:无法创建字段,因为字段名重复_dbz__physicalTableIdentifier

如何进行转换,将每个表的事件转发到各自的主题?

共有1个答案

马阳晖
2023-03-14
  1. 删除架构名称

在第一个转换中,我试图删除主题路由中重复的模式名称。

使用正则表达式转换后,将有两个点,因此需要修复:

"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
"transforms.RerouteName.topic.replacement": "$1.$2" 

您可以尝试使用来自Kafka Connect Common Transformation的ChangeCase SMT。

 类似资料:
  • 从bugu-mongo 2.11版本开始,支持连接到多个数据库。 在前面的示例代码中,我们都只是连接到一个数据库: //默认的数据库连接 BuguConnection conn = BuguFramework.getInstance().createConnection(); conn.setHost("192.168.0.100"); conn.setPort(27017); conn.setU

  •   //默认Mysql数据库 'default_mysql_config' => 'mysql', //Mysql数据库配置 'mysql' => array( 'host'  => 'localhost', 'port' => '3306', 'username' => 'root', 'password' => '', 'db_name' => 'g-framework', 'db_prefi

  • 本文向大家介绍web.config中配置数据库连接的方式,包括了web.config中配置数据库连接的方式的使用技巧和注意事项,需要的朋友参考一下 在网站开发中,数据库操作是经常要用到的操作,ASP.NET中一般做法是在web.config中配置数据库连接代码,然后在程序中调用数据库连接代码,这样做的好处就是当数据库连接代码需要改变的时候,我们只要修改web.config中的数据库连接代码即可,而

  • 由于基于HTTP协议的Web程序是无状态的,因此,在应用程序中使用JDBC时,每次处理客户端请求时都会重新建立数据库连接。如果客户端的请求非常频繁,服务端在处理数据库时将会消耗非常多的资源。因此,在Tomcat中提供了数据库连接池技术。数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是重新建立一个数据库连接。在使用完一个数据库连接后,将其归还数据库连接池

  • 问题内容: 我想拥有一个配置文件,然后在创建会话时将hibernate-configuration-> session-factory-> connection.connection_string属性更改为我希望通过编程的方式?可能吗? 更新: 我相信我可以这样做 我想知道的是,如果这样还行,那么这是处理每个会话连接到不同数据库的不好方法吗?如果是这样,为什么等等。我还想知道是否/如何使用.net

  • 我的WebApp使用多个数据库,我尝试使用GlassFish连接池来管理连接,但我发现配置示例只使用一个数据库。 那么,我该怎么办?创建与我正在使用的数据库数量相同的连接池,或者是否有方法将一个池配置为多个数据库?