当前位置: 首页 > 知识库问答 >
问题:

如何检查Kafka消费者是否准备好了

盖泽宇
2023-03-14

我将 Kafka 提交策略设置为最新且缺少前几条消息。如果我在开始将消息发送到输入主题之前先睡20秒,那么一切都按预期工作。我不确定问题是否与消费者需要很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好了?

共有3个答案

凌俊语
2023-03-14

多亏了阿列克谢(我也投了赞成票),我似乎基本上按照同样的想法解决了我的问题。

只是想分享我的经验……在我们的案例中,我们根据要求使用Kafka。

我尝试了…KafkaConsumer。分配()重复(使用Thread.sleep(100)),但似乎没有帮助。添加KafkaConsumer。投票(50)似乎已经为消费者(群体)做好了准备,并且也收到了第一个响应。测试了几次,现在它一直在工作。

顺便说一句,测试需要停止应用程序

PS:只需调用poll(50)不带赋值() 获取逻辑可能无法保证消费者(组)已经准备好了。

燕雨石
2023-03-14

您可以执行以下操作:

我有一个从kafka主题读取数据的测试
所以您不能在多线程环境中使用KafkaConsumer,但您可以传递参数“AtomicReference assignment”,在使用者线程中更新它,然后在另一个线程中读取它例如,项目中的工作代码被截取以进行测试:

    private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

P. S.
需要停止-全局标志,停止所有正在运行的线程,如果有成功的失败
事件-对象列表,我想检查
readTimeout-我们将等待多少时间,直到读取所有数据,如果readTimeout==-1,然后停止,当我们读取任何东西

富辰阳
2023-03-14

>

  • 您可以使用消费者。assignment(),它将返回一组分区,并验证是否分配了该主题可用的所有分区。

    如果您使用的是spring-kafka项目,您可以包括spring-kafka测试依赖性,并使用下面的方法等待主题分配,但您需要有容器<代码>ContainerTestUtils。waitForAssignment(对象容器,int分区)

  •  类似资料:
    • 我有一个在Docker中运行的Cassandra,我想在数据库准备就绪时启动一个CQL脚本。我尝试检查端口以检测它何时就绪: 但是在数据库真正准备好之前就打开了端口,因此失败。如何正确检查Cassandra状态并启动脚本?提前道谢。

    • 我目前正在做一个kafka java项目。我是新来的,我发现很难理解与Kafka生产者/消费者设计相关的几个基本概念。 > 比方说,我有一个带有单个分区的主题,我有一个生产者正在写这个主题,一个消费者正在从这个主题中消费。如果我部署同一个应用程序的多个实例,每个实例将运行自己的消费者。在这种情况下,因为所有消费者都属于同一个group pId,所以消息是否会在多个实例上运行的消费者之间平均分配?

    • 我想在远程位置检查Kafka消费者的连接。 可以确定是否将使用者分配给分区。 在远程位置,我可以从Kafka代理获得有关该主题的详细信息。 但是消费者能否保证消费者能够收到消费者与主题分区匹配的消息?

    • 我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #

    • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

    • 谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但