当前位置: 首页 > 知识库问答 >
问题:

在Java中获取Kafka未消费消息计数

万选
2023-03-14

共有1个答案

施俊驰
2023-03-14

还有另一种替代方法

Kafka消费者有API来获取主题每个分区的endpoint

List partitions = new ArrayList<>();
        for (PartitionInfo p : parts) {
            partitions.add(new TopicPartition(topic, p.partition()));
        }
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);

对于每个主题分区,u可以得到最近提交的偏移量。通过使用这两个数字,你可以很容易地得到未消耗的滞后

for (TopicPartition tp : offsets.keySet()) {
            OffsetAndMetadata commitOffset = consumer.committed(new TopicPartition(tp.topic(), tp.partition()));
            Long lag = commitOffset == null ? offsets.get(tp) : offsets.get(tp) - commitOffset.offset();            
        }
 类似资料:
  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我的Spring/java消费者无法访问生产者生成的消息。但是,当我从控制台/终端运行消费者时,它能够接收Spring/java生产者生成的消息。 消费者配置: 监听器配置: Kafka听众: 消费者应用: 测试用例1:通过 我启动了我的Spring/ java生产者并从控制台运行消费者。当我生成消息表单创建者时,我的控制台使用者能够访问该消息。 测试用例2:失败:我启动了spring/java消

  • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。