当前位置: 首页 > 知识库问答 >
问题:

确定哪个主题在Kafka consumer中有记录

寇桐
2023-03-14

我一直在关注Kafka消费类课程。我可以将主题作为列表对象传递。我指的是下面的文章https://docs.confluent.io/current/clients/java.html,但我需要知道,一旦消费者类订阅了主题,我如何知道哪个主题中有记录。有没有办法找到答案?代码如下:

public abstract class ConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
    this.consumer = consumer;
    this.topics = topics;
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
  try {
      consumer.subscribe(topics);  --> Consuming list of topics

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);  --> Which topic is returning the records?
        records.forEach(record -> process(record));    
     }  
   } catch (WakeupException e) {
     // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
    } finally {
       consumer.close();
      shutdownLatch.countDown();
    }
  } 
}

共有1个答案

宋高谊
2023-03-14

ConsumerRecords中的partitions()方法将返回一组TopicPartitions:

分区-获取包含此记录集中的记录的分区。

然后,您可以迭代该集以获取主题()名称和分区()数字,这取决于您需要什么。例如:

   for (TopicPartition tp : records.partitions()) {
     System.out.println("Got " + records.records(tp).size() + " records "
       + "from topic:partition " + tp.topic() + ":" + tp.partition());
   }
 类似资料:
  • 你好,我正在写一个服务围棋和Kafka,我需要实现一个删除所有endpoint,将删除所有记录从一个特定的主题。然而,我找不到一个合适的方法来做到这一点。我使用Sarama库为Kafka。 到目前为止,我能找到实现删除所有的唯一两种方法是删除主题,这似乎不是处理这个问题的有效方法,第二种方法是使用Sarama库中的函数,但是这函数删除偏移量小于相应分区给定偏移量的记录。这意味着我必须先得到最新的偏

  • 问题内容: 对于Java对象,有没有办法告诉哪个线程(或null)当前 拥有其监视器?或者至少是一种方法来判断当前线程是否拥有它? 问题答案: 我自己找到了一些答案。要测试当前线程是否拥有 监视器,是否 存在! exists! 这确实非常快(亚微秒),并且从1.4开始就可用。 通常,要测试哪个线程(或线程ID)持有该锁,可以 对 classes (thanks @amicngh). 有一些注意事项

  • 有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。

  • 问题内容: 我能够运行gensim的LDA代码,并获得各自关键词的前10个主题。 现在,我想更进一步,通过查看将哪些文档归类到每个主题中,来了解LDA算法的准确性。gensim LDA有可能吗? 基本上我想做这样的事情,但是在python中并使用gensim。 具有主题模型的LDA,如何查看不同文档属于哪些主题? 问题答案: 使用主题的概率,您可以尝试设置一些阈值并将其用作聚类基线,但是我敢肯定,

  • 我想使用PublishSubject创建一个广播系统,一个后台任务将轮询一些endpoint,并定期使用该主题广播结果。我希望在第一个订阅者订阅主题时开始轮询,并在没有更多订阅者时停止轮询。如果有新订阅者订阅,则应继续轮询。