我想知道如何通过java api在kafka中获取每个主题的消息数量,我不知道想使用下面帖子中提到的命令行工具。知道怎么做吗?
PS:我不想通过循环KAFKA消费者流来计算计数,我试图在开始时计算这个计数(在从KAFKA消费之前)
Java,如何在阿帕奇Kafka中获取主题中的消息数
使用新的< code>KafkaConsumer,您可以使用< code>seekToBeginning(...)和< code>seekToEnd(...)并计算每个分区的最大和最小偏移量之差,并将这些数字相加。
如果进行搜索,则不会使用消息。请记住,这种搜索是懒惰的,即,您需要使用位置(...)
来实际触发搜索。由于懒惰,两种寻求方法都不会返回任何内容。但是,位置(...)
将给出可用于计算的偏移量。
看见http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
我正在寻找一种从Kafka主题中删除(完全删除)已消费记录的方法。我知道有几种方法可以做到这一点,例如更改主题的保留时间或删除Kafka logs文件夹。但我要寻找的是一种使用Java API删除某个主题的一定数量记录的方法,如果可能的话。 我试过测试AdminClient API,特别是AdminClient。deleteRecords(recordsToDelete)方法。但如果我没弄错的话,
问题内容: 我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。我们如何获取主题中的消息数量? 问题答案: 从消费者的角度来看,想到此的唯一方法是实际消费消息并计数。 Kafka代理公开了自启动以来收到的消息数量的JMX计数器,但是您不知道已经清除了其中的多少。 在最常见的情况下,最好将Kafka中的消息视为无限流,而获得当前磁盘上保留的离散值并不重要。此外,在与
我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。
如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取
我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;