我想我可能错过了一些配置,但我们正在尝试使用Debezium来从一个有大约800万条记录的表中快照所有行,一段时间后它就会停止。
连接器配置为:
{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.user":"MyUser",
"database.server.id":"12345",
"tasks.max":"1",
"database.history.kafka.bootstrap.servers":"MyKafka:9092",
"database.history.kafka.topic":"MyConnectorHistory",
"database.server.name":"MyDbName",
"database.port":"3306",
"table.whitelist":"BigTable",
"decimal.handling.mode":"double",
"database.hostname":"***",
"database.password":"***",
"name":"MyConnector",
"database.whitelist":"MyDb",
"snapshot.mode":"initial_only",
"connect.timeout.ms":"60000"
}
连接器开始扫描行:
April 24th 2019, 13:06:52.573 2019-04-24 16:06:52,569 INFO MySQL|MyDbName|snapshot Step 9: - 2040000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:59:29.129 [io.debezium.connector.mysql.SnapshotReader]
... other prints
April 24th 2019, 12:17:28.448 2019-04-24 15:17:28,447 INFO MySQL|MyDbName|snapshot Step 9: - 50000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:10:05.008 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:43.183 2019-04-24 15:07:43,183 INFO MySQL|MyDbName|snapshot Step 9: - 40000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:19.744 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:36.499 2019-04-24 15:07:36,498 INFO MySQL|MyDbName|snapshot Step 9: - 30000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:13.059 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:30.157 2019-04-24 15:07:30,157 INFO MySQL|MyDbName|snapshot Step 9: - 20000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:06.718 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:25.116 2019-04-24 15:07:25,116 INFO MySQL|MyDbName|snapshot Step 9: - 10000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:01.677 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.439 2019-04-24 15:07:23,439 INFO MySQL|MyDbName|snapshot Step 9: - scanning table 'MyDb.BigTable' (1 of 10 tables) [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.427 2019-04-24 15:07:23,427 INFO MySQL|MyDbName|snapshot Step 8: tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables. [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.427 2019-04-24 15:07:23,427 INFO MySQL|MyDbName|snapshot Step 9: scanning contents of 10 tables while still in transaction [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.143 2019-04-24 15:07:23,143 INFO MySQL|MyDbName|snapshot Step 7: generating DROP and CREATE statements to reflect current database schemas: [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.142 2019-04-24 15:07:23,142 INFO MySQL|MyDbName|snapshot Step 6: read binlog position of MySQL master [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.739 2019-04-24 15:07:22,739 INFO MySQL|MyDbName|snapshot Step 5: flush and obtain read lock for 10 tables (preventing writes) [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.635 2019-04-24 15:07:22,635 INFO MySQL|MyDbName|snapshot Step 4: read list of available tables in each database [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.633 2019-04-24 15:07:22,633 INFO MySQL|MyDbName|snapshot Step 3: read list of available databases [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.632 2019-04-24 15:07:22,632 INFO MySQL|MyDbName|snapshot Step 2: start transaction with consistent snapshot [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.632 2019-04-24 15:07:22,631 INFO MySQL|MyDbName|snapshot Step 1: unable to flush and acquire global read lock, will use table read locks after reading table names [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.617 2019-04-24 15:07:22,617 INFO MySQL|MyDbName|snapshot Step 1: flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.SnapshotReader]
然后过了一段时间,我们
Failed to flush, timed out while waiting for producer to flush outstanding 4094 messages
Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
然后,扫描停止,我们多次尝试再次刷新提交偏移:
April 24th 2019, 12:34:08.641 2019-04-24 15:34:08,641 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
April 24th 2019, 12:34:08.640 2019-04-24 15:34:08,640 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 5560 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:33:18.640 2019-04-24 15:33:18,640 INFO || WorkerSourceTask{id=MyConnectorr-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:33:18.640 2019-04-24 15:33:18,640 INFO || WorkerSourceTask{id=MyConnectorr-0} flushing 5560 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:32:18.640 2019-04-24 15:32:18,640 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
April 24th 2019, 12:32:18.639 2019-04-24 15:32:18,639 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 5560 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:31:28.639 2019-04-24 15:31:28,639 INFO || WorkerSourceTask{id=MyConnectorr-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:31:28.639 2019-04-24 15:31:28,639 INFO || WorkerSourceTask{id=MyConnectorr-0} flushing 5560 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:30:28.639 2019-04-24 15:30:28,639 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
April 24th 2019, 12:30:28.636 2019-04-24 15:30:28,635 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 652 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:29:38.635 2019-04-24 15:29:38,635 INFO || WorkerSourceTask{id=MyConnectorr-0} flushing 5556 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:29:38.635 2019-04-24 15:29:38,635 INFO || WorkerSourceTask{id=MyConnectorr-0} Committing offsets
一段时间后(大约9~10分钟),它似乎成功了,并开始再次扫描行。但是过了一段时间后,它再次失败,然后,没有完成所有记录,连接器将其状态更改为FAIL
其中一个错误是
{
"name":"MyConnector",
"connector":{
"state":"RUNNING",
"worker_id":"svc.cluster.local:8083"
},
"tasks":[
{
"state":"FAILED",
"trace":"org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing\n\tat org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:318)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:197)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
"id":0,
"worker_id":"svc.cluster.local:8083"
}
],
"type":"source"
}
我读了这个问题:https://github.com/confluentinc/kafka-connect-jdbc/issues/161,并试图按照建议更改参数值。它更好,但在一些时间后仍然失败:目前,我的连接配置是:
OFFSET_FLUSH_INTERVAL_MS: 60000
OFFSET_FLUSH_TIMEOUT_MS: 50000
CONNECT_PRODUCER_BUFFER_MEMORY: 45554432
我还尝试了这里描述的这些值:MySQL中的Debezium刷新超时和OutOfMemoryError错误
有一件事我还没有尝试过,那就是使用快照。选择陈述覆盖
参数。但我不确定这是否会有帮助,因为有时提交偏移量问题会发生在100k条消息上。我必须多次恢复并停止连接器。
我使用Debezium对MySQL数据库进行了快照,该数据库有超过3000万条记录的多个表。不过我们有一个有100万张唱片。对于这一个,我使用了select语句override config(因为它是一个仅插入的表)。
最初,使用默认设置对数据库进行快照时,我遇到了与您所面临的完全相同的问题。我调整了以下配置,这有助于解决我的问题。
kafka connect worker在worker中配置设置。属性配置文件:
offset.flush.timeout.ms=60000
offset.flush.interval.ms=10000
max.request.size=10485760
减少偏移刷新间隔允许Kafka连接更频繁地刷新偏移,而设置大的超时可以让它有更多的时间来获得提交的确认。
Debezium configs通过curl请求对其进行初始化:
max.queue.size = 81290
max.batch.size = 20480
队列的默认大小是8192,对于较大的数据库来说,这个值非常低。将这些配置升级有很大帮助。
希望这对你的问题有所帮助
我正在使用来显示我捕获的预览。我想使用宽度=1080,高度=1920作为预览。在哪里可以设置预览的大小? 我用谷歌搜索了答案,但它们都是相机版本一的。我正在使用android.hardware.camera2。
在节点中。在js服务器上,捕获SIGTERM和捕获SIGINT有什么区别吗? 我认为进程不应该能够防止SIGINT关闭? 我是否能够捕获两个信号并阻止退出?我的实验表明答案是肯定的,但从我所读到的内容来看,SIGINT总是假设关闭一个进程。 或者我把SIGINT和SIGKILL混淆了?也许SIGKILL是我无法恢复的信号? 捕捉这些信号当然可以让我优雅地关机: 我想我把SIGINT和SIGKILL
我使用的是SpringBoot2.0。3释放。我想增加HikariCP的最大池大小,默认情况下为10。 我试着在带有 spring.datasource.hikari.maximum-池-大小=200 但是它不起作用,因为在日志中它仍然显示最大池大小为10。 我想改变的原因是,我不知怎么地达到了登台的极限,我不知道是什么原因造成的。
问题内容: 在,我可以适用于在标签和随意调整表列。但是,这在Bootstrap 4中不起作用。如何在Bootstrap 4中实现类似的功能? 查看提供的codeply大小不正确,特别是如果您向表中添加一些数据。查看其运行方式: 问题答案: 更新于2018 确保您的表包含该类。这是因为Bootstrap4表是“选择加入”的,因此必须有意将类添加到表中。 Bootstrap 3.x还具有一些CSS来重
问题内容: 这可能是一个愚蠢的问题,但可能会为联接在内部的工作方式提供一些启示。 假设我有一个大表和一个小表(100K行vs. 100行)。 以下两个选项在速度方面是否会有任何差异? 注意,唯一的区别是表的连接顺序。 我意识到不同的SQL语言之间的性能可能会有所不同。如果是这样,MySQL与Access相比如何? 问题答案: 不,顺序无关紧要。 几乎所有的RDBMS(例如MS Access,MyS