当前位置: 首页 > 知识库问答 >
问题:

Kafka connect sink配置问题-“忽略无效的任务提供的偏移量--未分配分区”

魏烨熠
2023-03-14

我们正在尝试使用kubernetes在GCP上运行kafka connect worker,在Postgresql上配置一个源连接器,在BigQuery上同步一个宿连接器,并管理Confluent kafka。Kafka的偏移、配置和状态主题按照规范配置,分别具有25、1、5个分区、紧凑的清理策略和7天的保留期。

连接器通过REST API启动。源连接器看起来工作正常,但接收器连接器在一段时间后开始记录这些警告:

[2021-09-06 08:13:12,429] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=500, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.schema.table-1, com_sync_master_dev.schema.table-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)

此外,接收器连接器的每次重新启动都从头开始,就像它不能读取偏移量来开始一样。

在问题发生之前,代理失去连接,连接器停止,然后开始重新平衡。


2021-09-09 07:55:51,291] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 5: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,298] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,300] DEBUG Putting 500 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask)
[2021-09-09 07:55:51,301] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,302] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:56,732] DEBUG re-attempting insertion (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,735] DEBUG table insertion completed successfully (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,739] DEBUG Wrote 500 rows over 1 successful calls and 0 failed calls. (com.wepay.kafka.connect.bigquery.write.batch.TableWriter)
[2021-09-09 07:55:56,736] INFO [Worker clientId=connect-1, groupId=database-sync] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-fd48e893-1729-4df4-8d1e-3370c1e76e1f', leaderUrl='http://confluent-bigquery-connect:8083/', offset=555, connectorIds=[master-gcp-bq-sink, master-gcp-source], taskIds=[master-gcp-bq-sink-0, master-gcp-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)

接收器连接器的偏移量总是从0重新启动,并且WorkerSink任务跳过上次提交,日志:

[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1345: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,281] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,300] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,595] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,795] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,817] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
...
[2021-09-09 08:01:03,158] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,168] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,381] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,410] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,762] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
....
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-0/OffsetAndMetadata{offset=395300, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=380428, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)

源配置:

{
"name": "master-gcp-source",
"config": {
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "plugin.name": "pgoutput",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.basic.auth.credentials.source": "******",
  "key.converter.schema.registry.basic.auth.user.info":"*****",
  "key.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "errors.tolerance": "none",
  "errors.deadletterqueue.topic.name":"dlq_postgres_source",
  "errors.deadletterqueue.topic.replication.factor": 1,
  "errors.deadletterqueue.context.headers.enable":true,
  "errors.log.enable":true,
  "errors.log.include.messages":true,
  "value.converter.basic.auth.credentials.source": "******",
  "value.converter.schema.registry.basic.auth.user.info":"***************",
  "value.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
  "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
  "database.hostname": "hostname",
  "database.port": "5432",
  "database.user": "some_db_user",
  "database.password": "***********",
  "database.dbname" : "master",
  "database.server.name": "com_master_dev",
  "database.sslmode": "require",
  "table.include.list": "schema.table",
  "table.ignore.builtin": true,
  "heartbeat.interval.ms": "5000",
  "tasks.max": "1",
  "slot.drop.on.stop": false,
  "xmin.fetch.interval.ms": 0,
  "interval.handling.mode": "numeric",
  "binary.handling.mode": "bytes",
  "sanitize.field.names": true,
  "slot.max.retries":6,
  "slot.retry.delay.ms": 10000,
  "event.processing.failure.handling.mode": "fail",
  "slot.name": "debezium",
  "publication.name": "dbz_publication",
  "decimal.handling.mode": "precise",
  "snapshot.lock.timeout.ms": "10000",
  "snapshot.mode":"initial",
  "output.data.format": "AVRO",
  "transforms": "unwrap",
  "offset.flush.interval.ms": "0",
  "offset.flush.timeout.ms" : "20000",
  "max.batch.size": "1024",
  "max.queue.size":"4096",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}

水槽配置:

{
"name": "master-gcp-bq-sink",
"config": {
  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "tasks.max": "1",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.basic.auth.credentials.source": "*********",
  "key.converter.schema.registry.basic.auth.user.info":"************",
  "key.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.basic.auth.credentials.source": "*******",
  "value.converter.schema.registry.basic.auth.user.info":"****************************",
  "value.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
  "config.action.reload": "restart",
  "topics": "com_master_dev.schema.table",
  "project": "dev",
  "defaultDataset": "schema",
  "keyfile": "{********}",
  "keySource": "JSON",
  "errors.tolerance": "none",
  "errors.deadletterqueue.topic.name":"dlq_bigquery_sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable":true,
  "errors.log.enable":true,
  "errors.log.include.messages":true,
   "data.format":"AVRO",
  "upsertEnabled": true,
  "deleteEnabled": false,
  "allowNewBigQueryFields": "true",
  "sanitizeTopics": true,
  "sanitizeFieldNames": true,
  "autoCreateTables": true,
  "timePartitioningType": "DAY",
  "kafkaKeyFieldName":"key_placeholder",
  "mergeIntervalMs": "60000",
  "mergeRecordsThreshold": "-1",
  "transforms": "unwrap",
  "consumer.override.session.timeout.ms":"60000",
  "consumer.override.fetch.max.bytes": "1048576",
  "consumer.override.request.timeout.ms":"60000",   
  "consumer.override.reconnect.backoff.max.ms":"10000",
  "consumer.override.reconnect.backoff.ms":"250",
  "consumer.override.partition.assignment.strategy":"org.apache.kafka.clients.consumer.CooperativeStickyAssignor", // also tried with RoundRobinAssignor
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms": "RegexTransformation",
  "transforms.RegexTransformation.type":"org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.RegexTransformation.regex":"(com_sync_master_dev.schema.)(.*)",
  "transforms.RegexTransformation.replacement": "$2"
}
}

我们遗漏了什么?如何避免无效的任务偏移,并确保接收器连接器从先前的偏移继续?

共有1个答案

云承弼
2023-03-14

因此,在Kafka中,您需要为主题的消费者配置一个消费者组,以利用偏移量。否则,失败的水槽将重新启动,不知道上一次它还活着时读取的偏移量。

当使用者组的成员从主题读取时,它会将其进度提交到代理,以便它知道从哪里读取。

似乎由于某种原因跳过了偏移提交,这就是接收器始终从偏移量 0 开始的原因。

 类似资料:
  • 考虑一个带有一些节点的Flink集群,其中每个节点都有一个多核处理器。如果我们根据核的数量和相等的内存共享来配置插槽的数量,那么Apache Flink如何在节点和空闲插槽之间分配任务呢?他们是否受到公平对待? 当我们根据节点上可用的核数配置任务插槽时,是否有任何方法使/配置Flink以平等对待插槽  · · · 例如,假设我们对数据进行相等的分区,并在分区上运行相同的任务。Flink使用来自某些

  • 在这最后一章中,我们将回到:kv应用程序,给它添加一个路由层,使之可以根据桶的名字,在各个节点间分发请求。 路由层会接收一个如下形式的路由表: [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}] 路由者(负责转发请求的角色,可能是个节点)将根据桶名字的第一个字节查这个路由表, 然后根据路由表所示将用户对桶的请求发给相应

  • 我正在尝试将Twitter日期时间转换为本地ISO字符串(用于prettyDate),现在为2天。我只是没弄准当地时间.. im使用以下功能: 在newdate中,一切正常,但toISOString()将它再次返回到原始时间。有谁能帮我从Twitterdate格式为:Thu,31 May 2012 08:33:41+0000的iso中获取当地时间吗

  • 我有一个由5个分区组成的主题如下: 似乎分区的偏移量非常接近其余分区的偏移量之和。我不知道如何以及为什么。

  • 我正在使用freemarker..想找到赋值变量的长度..我使用了size和length函数..但它失败并返回错误..请帮助我在如何找到赋值变量的长度 请找到下面我尝试过的代码... 对于“?size”左手操作数:需要扩展哈希、序列或扩展集合,但计算结果为markup_output(wrapper:f.c.templatexmloutputmodel):

  • 我正在使用STS IDE并开发了一个quarkus项目,如下例所示https://quarkus.io/guides/rest-client 当我尝试使用编译夸克斯: dev-DSkipTest=true命令构建代码时,我观察到下面的警告说它无法识别夸克斯属性,但应用程序已启动 控制台日志: 应用属性文件: pom.xml: 如果有人经历过类似的事情,请帮我解决我无法识别的原因。