当前位置: 首页 > 知识库问答 >
问题:

如何使用JAVA API从Kafka获取每个主题的消息编号[重复]

鲜于念
2023-03-14

我想知道如何通过java api在kafka中获取每个主题的消息数量,我不知道想使用下面帖子中提到的命令行工具。知道怎么做吗?

PS:我不想通过循环KAFKA消费者流来计算计数,我试图在开始时计算这个计数(在从KAFKA消费之前)

Java,如何在阿帕奇Kafka中获取主题中的消息数

共有1个答案

公羊俊
2023-03-14

使用新的< code>KafkaConsumer,您可以使用< code>seekToBeginning(...)和< code>seekToEnd(...)并计算每个分区的最大和最小偏移量之差,并将这些数字相加。

如果进行搜索,则不会使用消息。请记住,这种搜索是懒惰的,即,您需要使用位置(...)来实际触发搜索。由于懒惰,两种寻求方法都不会返回任何内容。但是,位置(...) 将给出可用于计算的偏移量。

看见http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

 类似资料:
  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

  • 我正在寻找一种从Kafka主题中删除(完全删除)已消费记录的方法。我知道有几种方法可以做到这一点,例如更改主题的保留时间或删除Kafka logs文件夹。但我要寻找的是一种使用Java API删除某个主题的一定数量记录的方法,如果可能的话。 我试过测试AdminClient API,特别是AdminClient。deleteRecords(recordsToDelete)方法。但如果我没弄错的话,

  • 问题内容: 我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。我们如何获取主题中的消息数量? 问题答案: 从消费者的角度来看,想到此的唯一方法是实际消费消息并计数。 Kafka代理公开了自启动以来收到的消息数量的JMX计数器,但是您不知道已经清除了其中的多少。 在最常见的情况下,最好将Kafka中的消息视为无限流,而获得当前磁盘上保留的离散值并不重要。此外,在与

  • 我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。

  • 如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取

  • 我的Kafka消费者必须倾听多个主题。每个主题都定义了一个优先级,比如高、低和中。 消费者服务必须以这样的方式配置,例如,它有30个执行器用于处理高主题的消息,5个执行器用于处理低主题和中主题的每个消息。 如果执行程序被占用并运行当前任务,是否有方法配置消费者停止消费来自相应主题的消息?