当前位置: 首页 > 知识库问答 >
问题:

使用Kafka Connect在MySQL数据库之间同步数据

姜羽
2023-03-14

我正在尝试使用基于Kafka Connect的Confluent在几个MySQL数据库之间同步数据。我在源连接器配置中使用了“批量”作为模式,因为主键类型是 varchar,所以我无法使用递增模式。它工作正常,但我遇到了两个问题:

  1. 似乎它无法同步删除,当源数据库中的数据被删除时,接收器数据库没有任何变化。数据仍存在于接收器数据库中。
  2. 同步数据需要相当长的时间。就我而言,同步具有 2~4k 行的表大约需要 4~4 分钟。我可以理解使用批量模式可能会花费更多时间来同步数据,但这不是太长了吗?

这是我的源连接器配置:

name=test-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://xxx.xxx.xxx:3306/xxx?useUnicode=true&characterEncoding=utf8
connection.user=user
connection.password=password
mode=bulk
table.whitelist=a_table

这是我的接收器连接器配置:

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1 topics=a_table
connection.url=jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?useUnicode=true&characterEncoding=utf8
connection.user=user
connection.password=password
insert.mode=upsert
pk.mode=record_value
pk.fields=mypk
auto.evolve=true

任何建议都将受到重视。谢谢你。

共有1个答案

徐洋
2023-03-14

>

  • 如果要同步删除操作,则需要使用 CDC,例如 Debezium。JDBC 连接器只能检测存在的记录,而不能检测不存在的记录。

    CDC也比批量获取更有效,因为它监视MySQL事务日志中所需表上的任何事务。

    您的主键是 VARCHAR?哇。如果您不想使用 CDC,我建议使用基于 INT 的密钥,然后使用 JDBC 连接器进行增量加载。或者向表中添加时间戳列,并将其用于增量。

  •  类似资料:
    • 问题内容: 我们正在多家商店中运行带有MySql后端的Java PoS(销售点)应用程序。我想保持商店中的数据库与主机服务器上的数据库同步。 商店中发生某些更改时,应在主机服务器上对其进行更新。我该如何实现? 问题答案: 复制不是很难创建。 这里有一些很好的教程: http://aciddrop.com/2008/01/10/step-by-step-how-to-setup-mysql-data

    • 我们有一个微服务架构,使用Kafka作为服务之间的通信机制。一些服务有自己的数据库。假设用户调用服务A,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,这个事件应该作为Kafka主题的一个项目报告给其他服务。确保数据库记录仅在Kafka主题成功更新(本质上是围绕数据库更新和Kafka更新创建分布式事务)时才写入的最佳方法是什么? 我们正在考虑使用spring kafka(在spring

    • 问题内容: 我想知道在InnoDB中是否可以有一个带有 外键的 引用另一个表的表? 如果是这样,该怎么办? 问题答案: 我在http://dev.mysql.com/doc/refman/5.1/en/innodb-foreign-key- constraints.html 上没有看到任何限制。 因此,只需使用 otherdb.othertable ,您会很好。

    • 问题内容: 我对使用MYSQL的PDO有点陌生,这是我的两个文件: 我有一个用于连接数据库的连接类: 我有一个account_info类,用于查询数据库中的数据: 我在index.php页面中都包含了这两个文件: 我只是无法使其正常工作,我没有任何输出,我认为它与示波器有关,但是由于我是PDO和OOP的新手,所以我不知道为什么要修复它的正确方法。提前致谢。 问题答案: 解决方案1 替换为 更换 与

    • 5.8.1.1 Elasticsearch 安装 安装 logstash ElasticSearch中 logstash安装和logstash-input-jdbc插件 安装 logstash-input-jdbc插件 logstash-plugin install logstash-input-jdbc # 在有网点环境下安装,将安装插件后的 logstash 拷贝到内网环境即可使用。 编辑

    • 问题内容: 我有一个Node.js / Express应用程序,该应用程序查询路由内的MySQL数据库,并将结果显示给用户。我的问题是如何在将用户重定向到他们请求的页面之前运行查询并阻止直到两个查询都完成? 在我的示例中,我有2个查询需要在呈现页面之前完成。如果将查询2嵌套在查询1的“结果”回调中,则可以使查询同步运行。但是,当查询数量增加时,这将变得非常复杂。 如何在不将后续查询嵌套在先前查询的