当前位置: 首页 > 面试题库 >

KafkaConsumer 0.10 Java API错误消息:没有当前分区分配

邹慈
2023-03-14
问题内容

我正在使用KafkaConsumer 0.10 Java
api。我想从特定的分区和特定的偏移量中消费。我抬起头,发现有一个搜索方法,但是抛出异常。任何人都有类似的用例或解决方案?

码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

例外

java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)

问题答案:

你可以之前seek(),你首先需要subscribe()一个主题
assign()主题,以消费者的分区。也请记住,这subscribe()assign()懒惰-
这样,你也需要做一个“虚拟来电”,以poll()才可以使用seek()

注意:从Kafka 2.0开始,新版本poll(Duration timeout)是异步的,不能保证您在poll返回时可以完成完整的分配。因此,您可能需要在使用前检查您的分配,seek()poll再次刷新该分配。(有关详细信息,请参见KIP-266)

如果您使用subscribe(),则使用组管理:这样,您可以使用同一组启动多个使用者,group.id并且该主题的所有分区将自动在组内的所有使用者上平均分配(每个分区将分配给组中的单个使用者)

如果要读取特定的分区,则需要通过进行手动分配assign()。这使您可以执行所需的任何作业。

顺便说一句:KafkaConsumer非常长的JavaDoc类,包括示例。值得一读。



 类似资料:
  • 我想知道,在什么情况下,具有相同分区键的消息会进入不同的分区。 我使用下面给出的命令运行了属于同一组的两个消费者在控制台中监听一个主题: 我使用“纳米/Kafka-php”库将消息放入带有键 的主题。当我发送多个这样的消息时,我发现很少有消息转到第二个消费者,而大多数消息都发送给消费者1。 由于我对所有消息使用相同的密钥,因此我希望所有消息都由同一个使用者使用。每个使用者都绑定到每个分区。 我使用

  • 我使用github的时间相对较短,并且一直使用客户端执行提交和拉取。我决定从昨天的git bash开始尝试它,并且我成功地创建了一个新的repo和提交的文件。 今天,我从另一台计算机上对存储库进行了更改,我提交了更改,现在我回到家里,执行了来更新我的本地版本,我得到了以下信息: 这次回购的唯一贡献者是我,没有分支(只有一个主人)。我在windows上执行了git Bash中的pull: 我做错了什

  • 我刚刚注意到,当我在分区中生成单个消息时,我的使用者不会收到它。只有在我在同一分区中生成了更多的消息之后,使用者才会收到它们。我的数设置为 1。 是否有其他一些配置可能会影响这里? 每个分区都有一个专用的消费者。 相关部件的使用者代码。我的使用者为 定义的不同主题启动多个线程。使用 https://github.com/mmustala/rdkafka-ruby 这是原始消费宝石的叉子。我添加了一

  • 我在库伯内特斯(8-16个节点,自动缩放)上运行Kafka Connect。我总共定义了44个连接器,每个Kafka主题一个(每个主题一个分区)。这些主题是由Debezium/Postgreql生成的。有3个Kafka节点。每个连接器tasks.max设置为4。我的大多数连接器(但不是每个!)有一个(总是一个)失败任务,由于java.lang.IllegalStateExc的:分区-0没有当前分配

  • 我在同一个消费者组上启动了两个消费者,我订阅了20个主题(每个主题只有一个分区) 仅在消费者上使用: kafka消费者组--引导服务器XXXXX:9092--组foo--描述--成员--详细 我做错了什么?

  • 我有以下代码: 在错误处理和从恐慌中恢复方面,他们的东西是我遗漏的。我是新来的,所以想要一些建议。