当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者:可以在一个轮询()调用中执行onPartionsRevoked和onPartionsAssigned回调吗?

符鸣
2023-03-14

我使用的是Kafka消费者 api 0.10.2.1。

Kafka 消费者为分区分配和撤销提供了回调:

consumer.subscribe(topics, consumerRebalanceListener);

其中< code > consumerRebalanceListener 有两个方法:

public void onPartitionsRevoked(Collection<TopicPartition> partitions);
public void onPartitionsAssigned(Collection<TopicPartition> partitions);

由于Kafka消费者中的所有内容都发生在单线程中,并且在轮询()方法中,这些回调是从轮询()方法中调用的。问题是,它们可以从一个轮询()调用中调用吗?还是它们总是需要两个单独的轮询()调用?

共有1个答案

丁鸿云
2023-03-14

在我看来,它们是从on轮询()调用调用的。当消费者实例开始加入该组时,首先调用onPartionsRevoked来撤销分配给该实例的所有分区,并发送JoinGroup请求。然后它无限期阻塞,直到收到响应。如果它成功加入组,它会通过调用onPartionsAssign来执行用户的回调。所有这些都在一轮轮询中完成。

 类似资料:
  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 关于KafkaConsumer(>=0.9),我在尝试实现满足自己需求的解决方案时遇到了一些严重的问题。 假设我有一个函数,它只能从一个Kafka主题中读取n条消息。 例如:-->获取主题中的下5条kafka消息。 所以,我有一个类似这样的循环。用实际正确的参数编辑。在本例中,使用者的max参数设置为1,因此实际循环只循环一次。不同的消费者(他们中的一些人迭代了许多消息)共享一个抽象的父亲(这一个

  • 我是Apache Camel的新手,我试图在一个简单的项目中理解和使用轮询消费者EIP,但我感到有点迷茫…谁能帮我解释一下,甚至用一个小的工作例子。 如有任何帮助,我们将不胜感激

  • 我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。

  • 我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。

  • 这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。