当前位置: 首页 > 知识库问答 >
问题:

MySQL的Debezium刷新超时和OutOfMemoryError错误

蒙才
2023-03-14

使用Debezium 0.7从MySQL读取,但在初始快照阶段会出现刷新超时和OutOfMemoryError错误。看下面的日志,连接器似乎试图一次写入太多消息:

WorkerSourceTask{id=accounts-connector-0} flushing 143706 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
WorkerSourceTask{id=accounts-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
Exception in thread "RMI TCP Connection(idle)" java.lang.OutOfMemoryError: Java heap space
WorkerSourceTask{id=accounts-connector-0} Failed to flush, timed out while waiting for producer to flush outstanding 143706 messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]

想知道正确的设置是什么http://debezium.io/docs/connectors/mysql/#connector-大型数据库的属性(

按照以下建议更改了设置,并解决了问题:

OFFSET_FLUSH_TIMEOUT_MS: 60000  # default 5000
OFFSET_FLUSH_INTERVAL_MS: 15000  # default 60000
MAX_BATCH_SIZE: 32768  # default 2048
MAX_QUEUE_SIZE: 131072  # default 8192
HEAP_OPTS: '-Xms2g -Xmx2g'  # default '-Xms1g -Xmx1g'

共有3个答案

段干庆
2023-03-14

为了补充Jiri所说的话:

Debezium bugtracker中现在有一个公开问题,如果您有任何关于根本原因、日志或复制的更多信息,请随时提供。

对我来说,改变Jiri在评论中提到的价值观并不能解决问题。唯一可行的解决方法是在同一个worker上创建多个连接器,这些连接器负责每个表的一个子集。要使其工作,需要启动连接器1,等待快照完成,然后启动连接器2,依此类推。在某些情况下,当稍后的连接器开始快照时,较早的连接器将无法刷新。在这些情况下,您可以在所有快照完成后重新启动worker,连接器将再次从binlog中拾取(确保快照模式为“when_needed”!)。

蒋阳华
2023-03-14

我可以确认以上由Jiri Pechanec发布的答案解决了我的问题。以下是我正在使用的配置:

kafka connect worker在worker中配置设置。属性配置文件:

offset.flush.timeout.ms=60000
offset.flush.interval.ms=10000
max.request.size=10485760

Debezium configs通过curl请求对其进行初始化:

max.queue.size = 81290
max.batch.size = 20480

我们在暂存MySQL db(~8GB)时没有遇到这个问题,因为数据集要小得多。对于生产数据集(~80GB),我们必须调整这些配置。

希望这有帮助。

郝昊天
2023-03-14

这是一个非常复杂的问题——首先,Debezium Docker映像的默认内存设置相当低,所以如果你使用它们,可能有必要增加它们。

接下来,有多个因素在起作用。我建议做以下步骤。

  1. 增加max.batch。大小最大队列。大小-减少提交次数

不幸的是,Kafka-6551在后台潜伏着一个问题,仍然可能造成严重破坏。

 类似资料:
  • 我制作了一个非常简单的登录和会话结构,以便在未来基于JSP的应用程序中重用。是这样的: 而位于WebContent根目录的login.jsp页面有一个 表单,其中包含用于身份验证的专用innerHTML和一个用于接收会话超时或登录失败消息的${failure}字段。 这个结构对我来说很管用。它拦截、请求登录、检查会话和身份验证,等等,但有一个小缺陷:如果您在登录页面并在超时后刷新它(F5或在URL

  • 我读到过NAT路由器“假设一个连接已经终止,如果一段时间内没有发送数据。” 我还读到TCP保活数据包通常不应该包含任何数据。 所以我的问题是: 上述说法是否正确? NAT路由器在重新排序/清理表时是否考虑空TCP保活数据包? 我之所以这么问是因为我需要两个endpoint之间的可靠连接,其中两个endpoint都必须能够检测连接问题并对其作出反应。我知道我自己可能只是实现一个keepalive机制

  • 我有一个奇怪的问题与Kafka->elasticsearch连接器。当我第一次启动时,我在elasticsearch中收到了一个新数据,并通过kibana dashboard进行了检查,但当我使用同一个producer应用程序在kafka中生成新数据并尝试再启动一次connector时,我在elasticsearch中没有得到任何新数据。现在我遇到了这样的错误: 我正在使用next命令运行连接器:

  • 我有以下docker撰写。我的Debezium连接器的yml配置: 当我想创建连接器时,我收到以下错误: Debezium连接器显示此错误: 我已经设置了字段<代码>数据库的值。密码如您所见,但我收到了数据库密码的错误。

  • 我到处找,但很少有人有这个问题。 在使用外部系统时出现异常:java.lang.assertionerror位于org.jetbrains.plugins.gradle.service.project.baseGradleProjectResolverExtension.populateModuleContentRoot(baseGradleProjectResolverExtension.jav

  • 我试图使文档中的狗/品种示例适用于Neo4j持久性。数据库是正常创建的,但无法刷新数据,至少看起来是这样。我在em.flush()指令(在上面的DogBreedRunner.java类中)中得到错误org.neo4j.graphdb.NotInTransactionException。 我谷歌并搜索了这个问题,但真的无法绕过它。这是我的项目(我没有接触文档中的java类)。 有人能帮忙吗?如果需要