当前位置: 首页 > 知识库问答 >
问题:

Debezium MongoDB连接器不执行初始快照

时修贤
2023-03-14

我正在使用MongoDB atlas和一个分片副本集集群,以及文档中描述的Debezium MongoDB连接器。

这是我当前配置的样子(运行独立设置):

name=dev-mongodb
connector.class=io.debezium.connector.mongodb.MongoDbConnector
tasks.max=4
mongodb.hosts=<some-url>.mongodb.net:27017
mongodb.name=mongodb
mongodb.user=<admin_user>
mongodb.password=<admin_user_pw>
database.include.list=<list_of_databases>
database.history.kafka.bootstrap.servers=<list_of_aws_msk_brokers>
database.history.kafka.topic=mongodb.history
include.schema.changes=true
mongodb.ssl.enabled=true

我可以在Kafka主题中接收CDC事件,但文档中描述的初始快照从未制作过。我尝试了另一种mongodb。名称导致创建和使用完全不同的主题集,但结果相同。

MongoDB oplog有大约200万行,kafka主题总共几乎没有几千条消息。

进一步挖掘时,连接器似乎记录了oplog最后一个位置的偏移量。是否可以重置此偏移?

共有1个答案

娄鹤轩
2023-03-14

在我看来,您在多个部署中使用了相同的连接器名称,这意味着尽管更改了配置并试图重置连接器的状态,它仍会继续查找先前的偏移量并恢复oplog位置。

有两种选择:

  • 使用完全不同的连接器名称创建新连接器
  • 手动清除接头的偏移

很多用户更喜欢第一个选项,因为它最简单。Kafka会根据连接器的名称记录连接器的偏移量,因此,只需调整连接器的名称,Kafka就会知道连接器是全新的,它找不到任何要恢复的持久偏移量。

第二个选项有点复杂,因为您需要首先找到存储偏移量的Kafka主题,通常默认情况下这是连接-偏移量,但可以被重写。一旦您知道该主题,您应该关闭所有使用该主题的连接器。如果您在连接器使用该主题时调整该主题,它可能会导致意外行为。

使用Kafka提供的kafkacat工具,您需要运行以下假定为默认主题名称的工具,因此相应地进行调整:

$ kafkacat -b localhost:9092 -t connect-offsets -C -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o\n'

这将生成一些输出,注意“键”和“分区”很重要。为了重置偏移量,您需要使用正确的“Key”和“Partition”值在主题中有效地写入一个NULL(或tombstone)。

假设以上提供了这一产出:

% Reached end of topic connect-offsets [0] at offset 0
% Reached end of topic connect-offsets [1] at offset 0
[…]
Key (52 bytes): ["source-file-01",{"filename":"/data/testdata.txt"}]
Value (15 bytes): {"position":87}
Timestamp: 1565859303551
Partition: 20
Offset: 0
[…]

您需要执行以下命令:

$ echo '["source-file-01",{"filename":"/data/testdata.txt"}]#' | \
    kafkacat -b localhost:9092 -t connect-offsets -P -Z -K# -p 20

在echo语句中,我们指定键后跟kafkacat参数-K#定义的键分隔符#,以及-Z选项,该选项将空值发送为NULL-p参数是指定分区的地方,正确设置键和分区很重要。

完成此操作后,您可以安全地重新启动使用偏移主题的连接器,您应该看到连接器的行为就像是一个全新的部署。

请注意,如果使用的连接器使用的是MySQL、SQL Server或Oracle等数据库历史记录主题,则还需要清除数据库历史记录主题。

然而,正如我之前所说的,使用一个新名称重新部署连接器更为简单,以避免需要使用所有Kafka主题魔法来获得相同的结果。

 类似资料:
  • 根据Debezium SQL Server连接器文档,初始快照仅在连接器首次运行时激发。然而,如果我删除连接器并创建一个新的但具有相同的名称,初始快照也不能工作。这是故意的还是已知的问题?有什么需要帮忙的吗

  • 问题内容: 在当前的问题中(我将文件打印到Java中的物理打印机),我一直在疯狂地遍历代码,试图从所使用的每个类的javadoc中吞噬所有有用的丢失信息。 现在,我从以前的问题中抽出了很多代码,所以有相当一部分我不是自己写的。我注意到的问题是,我抓取的代码正在初始化一个对象,例如实现接口(Doc)的“SimpleDoc”并将其分配给该接口? 小代码段: 现在,据我所知,我们创建了对象。我熟悉继承,

  • 我想使用Confluent的JDBC源连接器将数据从SQL Server表检索到Kafka中。 任何帮助都将不胜感激。

  • 本文向大家介绍docker mysql启动时执行初始化sql,包括了docker mysql启动时执行初始化sql的使用技巧和注意事项,需要的朋友参考一下 1.拉取Mysql镜像 docker pull mysql:5.7 2.检查mysql镜像 3.本地创建mysql外挂的目录 4.启动mysql 5.进入容器,登录mysql,检查发现已创建库 test_database 总结 以上所述是小编给

  • 下面的例子可以解释我的意思: <代码>自动p=标准::使\u共享 变量是默认初始化的(因此具有垃圾值)还是值初始化的(因此具有零值)?我在GCC 5.2和clang 3.6上进行了测试,前者进行值初始化,后者进行默认初始化。我想知道标准对此有什么规定?在我看来,在这种情况下,现代C肯定应该执行值初始化。

  • 在此之前,我的应用程序总是编译和执行。我只是使用 运行时位于C:/Program Files/java和JAVA_HOME**上 这个应用程序来自TextPad,它使用类路径运行良好,执行良好。系统编译正常,但我的JAR文件无法执行 问题:它无法在运行时找到我的类路径(它在编译时工作得很好),给了我这个错误 类路径 .;C:\Program Files(x86)\java\jre7\lib\ext