当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect:Java . lang . illegalstateexception:没有分区的当前赋值

高宇定
2023-03-14

我在库伯内特斯(8-16个节点,自动缩放)上运行Kafka Connect。我总共定义了44个连接器,每个Kafka主题一个(每个主题一个分区)。这些主题是由Debezium/Postgreql生成的。有3个Kafka节点。每个连接器tasks.max设置为4。我的大多数连接器(但不是每个!)有一个(总是一个)失败任务,由于java.lang.IllegalStateExc的:分区-0没有当前分配。

这里不是Kafka专家,注意;)我假设有3个Kafka节点,所以3个工人做得很好,而第4个任务没有任何连接,所以它失败了。但是为什么有时有4个任务运行良好呢?

此外,我经常遇到“由于重新平衡而导致的冲突操作”问题,这可能会发生几分钟甚至几个小时。最近我删除了所有pod,它们自己重新启动,问题消失了,但这不是长期的解决方案。

任务是什么。最大推荐值?提前感谢!

例外情况:

java.lang.IllegalStateException: No current assignment for partition table-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748

接收器连接器配置:

connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas   true
sanitizeTopics  true
autoCreateTables    true
topics  <topic-name>
tasks.max   3
schemaRegistryLocation  http://<ip>:8081
project <big-query-project>
maxWriteSize    10000
datasets    .*=<big-query-dataset>
task.class  com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name    <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait  1000
bufferSize  100000

并且它抛出了上面的异常<code>java.lang.IllegalStateException:没有当前的〔…〕赋值</code>

共有1个答案

东郭鹤龄
2023-03-14

属性任务的值.max取决于几个因素。最重要的是特定的连接器。特定连接器取决于其逻辑和任务值.max计算将要创建的任务数。例如,FileStreamSourceConnector 始终创建 1 个任务,因此即使您传递的值大于 1,它也只会创建一个。与PostgresConnector平行的情况相同。

tasks.max值还应取决于其他因素,如:Kafka Connect模式、您拥有多少Kafka Connect实例、机器的CPU等。

我怎么理解你在用source connector(< code > PostgresConnector )。源连接器不轮询来自Kafka的数据。您发布的异常与某个< code>SinkConnector有关。如果使用的是< code>SinkConnector,则< code>tasks.max不应超过分区数。如果您启动的任务多于分区数量,一些分区将处于空闲状态(状态为正在运行,但它们不处理数据),重新平衡可能会发生。

 类似资料:
  • 问题内容: 我正在使用KafkaConsumer 0.10 Java api。我想从特定的分区和特定的偏移量中消费。我抬起头,发现有一个搜索方法,但是抛出异常。任何人都有类似的用例或解决方案? 码: 例外 问题答案: 你可以之前,你首先需要一个主题 或 主题,以消费者的分区。也请记住,这和懒惰- 这样,你也需要做一个“虚拟来电”,以才可以使用。 注意:从Kafka 2.0开始,新版本是异步的,不能

  • 我使用github的时间相对较短,并且一直使用客户端执行提交和拉取。我决定从昨天的git bash开始尝试它,并且我成功地创建了一个新的repo和提交的文件。 今天,我从另一台计算机上对存储库进行了更改,我提交了更改,现在我回到家里,执行了来更新我的本地版本,我得到了以下信息: 这次回购的唯一贡献者是我,没有分支(只有一个主人)。我在windows上执行了git Bash中的pull: 我做错了什

  • 然后我试了一下: 有什么提示吗?

  • 我试图在Hazelcast 3.8.8中建立分区组。我的主要目标是将驻留在2台物理机器中的4个集群成员分为2个分区组。当我启用分区组时,它似乎不起作用,组也没有建立。您能告诉我启用分区组缺少什么吗? 我试图通过hazelcast启用分区分组。xml。使用group type=“CUSTOM”进行测试,并将驻留在my local和我们的服务器中的成员分为两个不同的成员组。成员组成了一个集群,但似乎没

  • 问题内容: 使用“新”样式类(我在python 3.2中)是否可以将一个类拆分为多个文件?我有一个大类(从面向对象的设计角度来看,考虑耦合等等,它实际上应该是一个类,但是为了方便编辑类,最好分割几个文件。 问题答案: 如果您的问题确实只是在编辑器中使用大型类,那么我真正寻找的第一个解决方案是解决问题的更好方法。第二种解决方案是更好的编辑器,最好是带有代码折叠的编辑器。 也就是说,有几种方法可以将一

  • 早上好, 当我从Hibernate3升级到4时,出现了“臭名昭著的”当前会话错误。我搜索了一下,似乎一切都安排妥当了。我需要一些洞察力:) 这是我的配置:Spring3.2Hibernate4.1.9 数据源上下文: 业务背景: 我的服务层: 最后,DAO实现: 这里是stackTrace: 谢谢你的帮助! 编辑: 好的,我清理了配置文件,现在: 对于DAO: 对于业务层: 并从DAO中删除@Tr