我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天内大小达到5 gb。作为这个问题解决方案,我想立即删除处理过的数据。我正在使用AdminClient中的delete record方法删除偏移量。但当我查看日志文件时,对应于该偏移量的数据不会被删除。
RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);
我不想要类似(log.retention.hours,log.retention.bytes,log.segment.bytes,log.cleanup.policy=delete)的建议
因为我只想删除消费者使用的数据。在这个解决方案中,我还删除了不被消耗的数据。
你有什么建议?
你没做错什么。您提供的代码可以工作,我已经测试过了。以防万一我忽略了您的代码中的某些内容,我的代码是:
public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
kafkaAdminClient.deleteRecords(deleteMap);
}
我使用过组:“org.apache.kafka”,名称:“kafka-clients”,版本:“2.0.0”
因此,检查您是否针对右分区(0表示第一个分区)
还有一个你可以考虑的选择。使用cleanup.policy=compact如果您的消息键重复,您可以从中受益。不仅仅是因为该密钥的旧消息会自动删除,还可以利用空负载的消息会删除该密钥的所有消息这一事实。只是不要忘记将delete.retention.ms和min.compaction.lag.ms设置为足够小的值。在这种情况下,您可以使用一个消息,然后为相同的密钥生成空有效负载(但是使用这种方法要谨慎,因为这样您可以删除您没有使用的消息(使用该密钥))
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费
由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。
我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认