我一直在关注Kafka消费类课程。我可以将主题作为列表对象传递。我指的是下面的文章https://docs.confluent.io/current/clients/java.html,但我需要知道,一旦消费者类订阅了主题,我如何知道哪个主题中有记录。有没有办法找到答案?代码如下:
public abstract class ConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
this.consumer = consumer;
this.topics = topics;
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics); --> Consuming list of topics
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE); --> Which topic is returning the records?
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
ConsumerRecords
中的partitions()方法将返回一组TopicPartition
s:
分区-获取包含此记录集中的记录的分区。
然后,您可以迭代该集以获取主题()
名称和分区()
数字,这取决于您需要什么。例如:
for (TopicPartition tp : records.partitions()) {
System.out.println("Got " + records.records(tp).size() + " records "
+ "from topic:partition " + tp.topic() + ":" + tp.partition());
}
你好,我正在写一个服务围棋和Kafka,我需要实现一个删除所有endpoint,将删除所有记录从一个特定的主题。然而,我找不到一个合适的方法来做到这一点。我使用Sarama库为Kafka。 到目前为止,我能找到实现删除所有的唯一两种方法是删除主题,这似乎不是处理这个问题的有效方法,第二种方法是使用Sarama库中的函数,但是这函数删除偏移量小于相应分区给定偏移量的记录。这意味着我必须先得到最新的偏
问题内容: 对于Java对象,有没有办法告诉哪个线程(或null)当前 拥有其监视器?或者至少是一种方法来判断当前线程是否拥有它? 问题答案: 我自己找到了一些答案。要测试当前线程是否拥有 监视器,是否 存在! exists! 这确实非常快(亚微秒),并且从1.4开始就可用。 通常,要测试哪个线程(或线程ID)持有该锁,可以 对 classes (thanks @amicngh). 有一些注意事项
有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。
问题内容: 我能够运行gensim的LDA代码,并获得各自关键词的前10个主题。 现在,我想更进一步,通过查看将哪些文档归类到每个主题中,来了解LDA算法的准确性。gensim LDA有可能吗? 基本上我想做这样的事情,但是在python中并使用gensim。 具有主题模型的LDA,如何查看不同文档属于哪些主题? 问题答案: 使用主题的概率,您可以尝试设置一些阈值并将其用作聚类基线,但是我敢肯定,
我想使用PublishSubject创建一个广播系统,一个后台任务将轮询一些endpoint,并定期使用该主题广播结果。我希望在第一个订阅者订阅主题时开始轮询,并在没有更多订阅者时停止轮询。如果有新订阅者订阅,则应继续轮询。