当前位置: 首页 > 知识库问答 >
问题:

Kafka-如何检查消费者是否活着,如果不是如何将消费者带回运行状态?

和和煦
2023-03-14

我目前正在做一个kafka java项目。我是新来的,我发现很难理解与Kafka生产者/消费者设计相关的几个基本概念。

>

  • 比方说,我有一个带有单个分区的主题,我有一个生产者正在写这个主题,一个消费者正在从这个主题中消费。如果我部署同一个应用程序的多个实例,每个实例将运行自己的消费者。在这种情况下,因为所有消费者都属于同一个group pId,所以消息是否会在多个实例上运行的消费者之间平均分配?

    如何从应用程序中定期检查消费者是否还活着?。

    请澄清上述问题。如果我的任何/所有假设/理解都是错误的,请纠正我。我知道我没有分享任何代码示例,因为这些都是概念性的问题。如果需要,我可以共享代码片段。

  • 共有2个答案

    宇文念
    2023-03-14

    >

  • Kafka只允许每个分区和用户组有一个用户。因此,多个消费者将等待主消费者倒下,然后在该点跳入,但对于给定的分区/消费者组,一次只有一个消费者在消费。

    您可以检查消费者组偏移是否与最大偏移保持一致,以查看是否存在滞后。如果一个人开始有问题,Kafka在循环多个消费者方面做得很好

    重启应用程序,它会从中断的地方开始,只是不要从一开始就重启消息。Kafka为你处理。

    我们用Spring靴搭配SpringKafka。没有“最佳”方法,但我们发现使用spring进行设置和维护很容易。

  • 方飞鸣
    2023-03-14

    >

  • 您说的主题使用单个分区意味着它无法将消息分发到多个分区。你会失去Kafka的一大优势。你必须增加一个以上的分区。如果您部署了同一个应用程序的多个实例,则分发没有帮助,因为消息发布到您提到的一个分区中,并且只有一个实例将仅分配给该分区,其他实例将处于空闲状态。

    您可以使用AdminClient Kafka API检查您的消费者是否存在任何滞后

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9091");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    AdminClient client = org.apache.kafka.clients.admin.AdminClient.create(props);
    ListConsumerGroupOffsetsResult offsets = client.listConsumerGroupOffsets("consumerId");
    Map<TopicPartition, OffsetAndMetadata> tt = offsets.partitionsToOffsetAndMetadata().get();
    ListConsumerGroupOffsetsResult offsets = client.listConsumerGroupOffsets(consumerId);
    Map<TopicPartition, OffsetAndMetadata> tt = offsets.partitionsToOffsetAndMetadata().get();
    for (Entry<TopicPartition, OffsetAndMetadata> entry : tt.entrySet()) {TopicPartition tp = entry.getKey();
      OffsetAndMetadata op = entry.getValue();
      Collections.singletonList(tp);
      consumer.assign(Collections.singletonList(tp));
      consumer.seekToEnd(Collections.singletonList(tp));
      System.out.println(consumerId + "," + tp.partition() + "," + consumer.position(tp) + ","
                                + op.offset() + "," + (consumer.position(tp) - op.offset()));
    }
    

    您还没有说明在何处部署,但如果您在mesos中使用marathon进行部署,它将自动重新启动。您可以手动重新启动,如果您使用与上一个相同的组id,您的应用程序将开始使用它离开的位置。

  •  类似资料:
    • 谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但

    • 我将 Kafka 提交策略设置为最新且缺少前几条消息。如果我在开始将消息发送到输入主题之前先睡20秒,那么一切都按预期工作。我不确定问题是否与消费者需要很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好了?

    • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

    • 我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #

    • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。

    • 我想描述以下场景:我有一个节点。js后端应用程序(它使用单线程事件循环)。这是系统的总体架构:Producer- 假设制作者向Kafka发送了一条消息,这条消息的目的是在数据库中进行某个查询并检索查询结果。但是,众所周知Kafka是一个异步系统。如果制作者向Kafka发送消息,它会得到一个响应,表明该消息已被Kafka经纪人接受。Kafka broker不会等到消费者轮询消息并处理它。 在这种情况