当前位置: 首页 > 知识库问答 >
问题:

计算 kafka 主题中存储的消息数

阮疏珂
2023-03-14

我使用的是0.9.0.0版本的Kafka,我想在不使用管理脚本Kafka-console-consumer.sh的情况下计算主题中的消息数。

我已经尝试了答案Java“How to get number of messages in a topic in apache kafka”中的所有命令,但都没有结果。有人能帮我吗?

共有3个答案

太叔昊苍
2023-03-14

您可以使用以下命令对所有计数求和:

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
楚意
2023-03-14

从技术上讲,您可以简单地使用主题中的所有消息并计算它们:

例子:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*

但是kafka.tools.GetOffsetShell方法会给你偏移量,而不是主题中实际的消息数量。这意味着如果主题被压缩,如果你通过消费消息或读取偏移量来计算消息,你会得到两个不同的数字。

主题压缩:https://kafka.apache.org/documentation.html#design_compactionbasics

楚举
2023-03-14

您可以尝试执行以下命令:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1

然后,汇总每个分区的所有计数。

更新:Java实现

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("your_topic"));
    Set<TopicPartition> assignment;
    while ((assignment = consumer.assignment()).isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
    }
    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
    final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
    assert (endOffsets.size() == beginningOffsets.size());
    assert (endOffsets.keySet().equals(beginningOffsets.keySet()));

    Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
            TopicPartition tp = entry.getKey();
            Long beginningOffset = entry.getValue();
            Long endOffset = endOffsets.get(tp);
            return endOffset - beginningOffset;
        }).sum();
    System.out.println(totalCount);
}
 类似资料:
  • 我需要一个Kafka主题存储的消息数量。这与任何消费者是否消费了消息无关。 以上是否等于Kafka主题中当前存储的消息数?

  • 本文向大家介绍kafka的消息存储?相关面试题,主要包含被问及kafka的消息存储?时的应答技巧和注意事项,需要的朋友参考一下 kafka的消息存储在磁盘上,一个kafka topic分为一个或多个partition,每个partition单独存储自己的消息数据 partition将数据记录到.log文件中,为了避免文件过大影响查询效率,将文件分段处理 记录消息到.log文件中的同时,会记录消息o

  • 正如标题中所说的,我想在我的主题中获得一些记录,但我无法找到一个使用kafka-python库的解决方案。有人知道吗?

  • 我试图了解如何跟踪Kafka的信息摄取。 我们现在遵循的工作流程是清除主题中的所有消息,然后我们用代码更改重新摄取。我需要知道那些代码更改有多成功。在当前状态下,我正在使用Kafka工具,手动刷新消息总数,并将结果保存在csv中,我知道这是不可持续的长期。 你对自动获取Kafka主题中的消息计数有什么建议?理想情况下,我想击中的主题一分钟一分钟的频率,并得到计数,以及窗口的时间,如1天等。

  • 我正在测试Kafka主题的工作,但我不明白删除是如何工作的。 我创建了一个简单的主题 在此之后,我创建了一个生产者,并发送了一些消息。消费者没有问题地接收到消息。但我预计,一分钟后,如果重复了消费者,它不会显示消息,因为它们肯定已经被删除了。但这种行为不会发生。 如果我用ksql创建一个查询,那也是一样的。消息总是会出现。 我想我不明白删除是怎么回事。 3)制作人留言 4)消费者 消费者会显示这四

  • 因此,我是使用Apache Kafka的新手,我正在尝试创建一个简单的应用程序,以便我可以尝试更好地理解API。我知道这个问题在这里被问了很多,但是我该如何清除存储在主题上的消息/记录? 我看到的大部分回答都是说更改消息保留时间或者删除