我们正在尝试使用kubernetes在GCP上运行kafka connect worker,在Postgresql上配置一个源连接器,在BigQuery上同步一个宿连接器,并管理Confluent kafka。Kafka的偏移、配置和状态主题按照规范配置,分别具有25、1、5个分区、紧凑的清理策略和7天的保留期。
连接器通过REST API启动。源连接器看起来工作正常,但接收器连接器在一段时间后开始记录这些警告:
[2021-09-06 08:13:12,429] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=500, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.schema.table-1, com_sync_master_dev.schema.table-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
此外,接收器连接器的每次重新启动都从头开始,就像它不能读取偏移量来开始一样。
在问题发生之前,代理失去连接,连接器停止,然后开始重新平衡。
2021-09-09 07:55:51,291] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 5: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,298] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,300] DEBUG Putting 500 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask)
[2021-09-09 07:55:51,301] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,302] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:56,732] DEBUG re-attempting insertion (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,735] DEBUG table insertion completed successfully (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,739] DEBUG Wrote 500 rows over 1 successful calls and 0 failed calls. (com.wepay.kafka.connect.bigquery.write.batch.TableWriter)
[2021-09-09 07:55:56,736] INFO [Worker clientId=connect-1, groupId=database-sync] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-fd48e893-1729-4df4-8d1e-3370c1e76e1f', leaderUrl='http://confluent-bigquery-connect:8083/', offset=555, connectorIds=[master-gcp-bq-sink, master-gcp-source], taskIds=[master-gcp-bq-sink-0, master-gcp-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
接收器连接器的偏移量总是从0重新启动,并且WorkerSink任务跳过上次提交,日志:
[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1345: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,281] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,300] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,595] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,795] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,817] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
...
[2021-09-09 08:01:03,158] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,168] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,381] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,410] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,762] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
....
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-0/OffsetAndMetadata{offset=395300, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=380428, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
源配置:
{
"name": "master-gcp-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.basic.auth.credentials.source": "******",
"key.converter.schema.registry.basic.auth.user.info":"*****",
"key.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name":"dlq_postgres_source",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable":true,
"errors.log.enable":true,
"errors.log.include.messages":true,
"value.converter.basic.auth.credentials.source": "******",
"value.converter.schema.registry.basic.auth.user.info":"***************",
"value.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"database.hostname": "hostname",
"database.port": "5432",
"database.user": "some_db_user",
"database.password": "***********",
"database.dbname" : "master",
"database.server.name": "com_master_dev",
"database.sslmode": "require",
"table.include.list": "schema.table",
"table.ignore.builtin": true,
"heartbeat.interval.ms": "5000",
"tasks.max": "1",
"slot.drop.on.stop": false,
"xmin.fetch.interval.ms": 0,
"interval.handling.mode": "numeric",
"binary.handling.mode": "bytes",
"sanitize.field.names": true,
"slot.max.retries":6,
"slot.retry.delay.ms": 10000,
"event.processing.failure.handling.mode": "fail",
"slot.name": "debezium",
"publication.name": "dbz_publication",
"decimal.handling.mode": "precise",
"snapshot.lock.timeout.ms": "10000",
"snapshot.mode":"initial",
"output.data.format": "AVRO",
"transforms": "unwrap",
"offset.flush.interval.ms": "0",
"offset.flush.timeout.ms" : "20000",
"max.batch.size": "1024",
"max.queue.size":"4096",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
水槽配置:
{
"name": "master-gcp-bq-sink",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.basic.auth.credentials.source": "*********",
"key.converter.schema.registry.basic.auth.user.info":"************",
"key.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "*******",
"value.converter.schema.registry.basic.auth.user.info":"****************************",
"value.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
"config.action.reload": "restart",
"topics": "com_master_dev.schema.table",
"project": "dev",
"defaultDataset": "schema",
"keyfile": "{********}",
"keySource": "JSON",
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name":"dlq_bigquery_sink",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable":true,
"errors.log.enable":true,
"errors.log.include.messages":true,
"data.format":"AVRO",
"upsertEnabled": true,
"deleteEnabled": false,
"allowNewBigQueryFields": "true",
"sanitizeTopics": true,
"sanitizeFieldNames": true,
"autoCreateTables": true,
"timePartitioningType": "DAY",
"kafkaKeyFieldName":"key_placeholder",
"mergeIntervalMs": "60000",
"mergeRecordsThreshold": "-1",
"transforms": "unwrap",
"consumer.override.session.timeout.ms":"60000",
"consumer.override.fetch.max.bytes": "1048576",
"consumer.override.request.timeout.ms":"60000",
"consumer.override.reconnect.backoff.max.ms":"10000",
"consumer.override.reconnect.backoff.ms":"250",
"consumer.override.partition.assignment.strategy":"org.apache.kafka.clients.consumer.CooperativeStickyAssignor", // also tried with RoundRobinAssignor
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms": "RegexTransformation",
"transforms.RegexTransformation.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RegexTransformation.regex":"(com_sync_master_dev.schema.)(.*)",
"transforms.RegexTransformation.replacement": "$2"
}
}
我们遗漏了什么?如何避免无效的任务偏移,并确保接收器连接器从先前的偏移继续?
因此,在Kafka中,您需要为主题的消费者配置一个消费者组,以利用偏移量。否则,失败的水槽将重新启动,不知道上一次它还活着时读取的偏移量。
当使用者组的成员从主题读取时,它会将其进度提交到代理,以便它知道从哪里读取。
似乎由于某种原因跳过了偏移提交,这就是接收器始终从偏移量 0 开始的原因。
考虑一个带有一些节点的Flink集群,其中每个节点都有一个多核处理器。如果我们根据核的数量和相等的内存共享来配置插槽的数量,那么Apache Flink如何在节点和空闲插槽之间分配任务呢?他们是否受到公平对待? 当我们根据节点上可用的核数配置任务插槽时,是否有任何方法使/配置Flink以平等对待插槽 · · · 例如,假设我们对数据进行相等的分区,并在分区上运行相同的任务。Flink使用来自某些
在这最后一章中,我们将回到:kv应用程序,给它添加一个路由层,使之可以根据桶的名字,在各个节点间分发请求。 路由层会接收一个如下形式的路由表: [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}] 路由者(负责转发请求的角色,可能是个节点)将根据桶名字的第一个字节查这个路由表, 然后根据路由表所示将用户对桶的请求发给相应
我正在尝试将Twitter日期时间转换为本地ISO字符串(用于prettyDate),现在为2天。我只是没弄准当地时间.. im使用以下功能: 在newdate中,一切正常,但toISOString()将它再次返回到原始时间。有谁能帮我从Twitterdate格式为:Thu,31 May 2012 08:33:41+0000的iso中获取当地时间吗
我有一个由5个分区组成的主题如下: 似乎分区的偏移量非常接近其余分区的偏移量之和。我不知道如何以及为什么。
我正在使用freemarker..想找到赋值变量的长度..我使用了size和length函数..但它失败并返回错误..请帮助我在如何找到赋值变量的长度 请找到下面我尝试过的代码... 对于“?size”左手操作数:需要扩展哈希、序列或扩展集合,但计算结果为markup_output(wrapper:f.c.templatexmloutputmodel):
我正在使用STS IDE并开发了一个quarkus项目,如下例所示https://quarkus.io/guides/rest-client 当我尝试使用编译夸克斯: dev-DSkipTest=true命令构建代码时,我观察到下面的警告说它无法识别夸克斯属性,但应用程序已启动 控制台日志: 应用属性文件: pom.xml: 如果有人经历过类似的事情,请帮我解决我无法识别的原因。