当前位置: 首页 > 知识库问答 >
问题:

如何检测spring kafka使用者何时停止从1个分区获取消息?

裴华荣
2023-03-14

我有3个spring kafka消费者(同一组)从3个分区获得消息。我想检测其中一个使用者何时停止从一个分区读取(其他两个使用者继续从其他两个分区读取)。到目前为止,这种情况已经发生过两次,当检测到时,很容易通过重新启动所有消费者来修复,这将导致重新平衡。问题是在这两种情况下,早点知道会很好。所以我尝试使用ListenerContainerIdleEvent,如下所示-

@EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
    LOG.info("idle event fired! listnerId=" + event.getListenerId());

    Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
    partitions.forEach(p ->
            LOG.info("partition: " + p.partition() + " topic:" + p.topic()));
}

这是我的测试结果-

当一个使用者被分配到多个分区时,当没有从一个分区读取消息(无论出于什么原因……使用者问题或没有消息可从分区读取)时,我是否可以得到通知?

更新:03/27/2018

我不确定我是否应该问一个与此相关的新问题,所以试图先扩展这个问题。我有一个消费者消费从1个主题与3个分区。我已经将IdleEventInterval=30secs设置为。每隔30秒,我就会收到以下日志消息。

  @EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
    LOG.info("No messages received for " + event.getIdleTime() + " milliseconds");

Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
partitions.forEach(p ->
    LOG.info("partition: " + p.partition() + " topic:" + p.topic()));

}

1)为什么该事件每30秒调用4次?

2)为什么每个消息集的分区信息不一致?有时没有分区信息,有时同一个消息集中的分区重复,等等。

共有1个答案

谈旺
2023-03-14

...没有可从分区读取的消息

如果并发度为1,且有3个分区,则所有三个分区将由同一个使用者处理。如果一个使用者被分配到一个以上的主题,并且在一段时间内没有收到来自特定主题的消息,框架中目前没有任何东西可以发布事件。

如果将容器并发性增加到3,则将有3个使用者-每个分区一个。如果事件处于空闲状态,则每个使用者将发布该事件。监听器必须是线程安全的,因为将有3个线程调用它,而且通常是并发的。

如果一个主题中有消息,而该主题被分配给的使用者没有接收到这些消息,但该使用者却在积极地从其他分区接收消息,这将是相当奇怪的。这需要Kafka家族的帮助。

 类似资料:
  • 我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下

  • 我刚刚注意到,当我在分区中生成单个消息时,我的使用者不会收到它。只有在我在同一分区中生成了更多的消息之后,使用者才会收到它们。我的数设置为 1。 是否有其他一些配置可能会影响这里? 每个分区都有一个专用的消费者。 相关部件的使用者代码。我的使用者为 定义的不同主题启动多个线程。使用 https://github.com/mmustala/rdkafka-ruby 这是原始消费宝石的叉子。我添加了一

  • 问题内容: 好的。 我可以从我的理解中得知某人正在滚动。因此,我试图找出有人停下来时该如何捕捉。从上面的示例中,您可以看到发生滚动时,我正在从一组元素中删除一个类。但是,我想在用户停止滚动时重新打开该类。 这样做的原因是,我打算在页面滚动时进行中转节目,以使页面具有我正在尝试的特殊效果。但是我尝试在滚动时删除的一个类与该效果冲突,因为它对某种性质是透明的。 问题答案: 更新资料 我写了一个扩展来增

  • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。