当前位置: 首页 > 知识库问答 >
问题:

Kafka:如何使用JavaAPI从主题中删除记录?

太叔俊侠
2023-03-14

我正在寻找一种从Kafka主题中删除(完全删除)已消费记录的方法。我知道有几种方法可以做到这一点,例如更改主题的保留时间或删除Kafka logs文件夹。但我要寻找的是一种使用Java API删除某个主题的一定数量记录的方法,如果可能的话。

我试过测试AdminClient API,特别是AdminClient。deleteRecords(recordsToDelete)方法。但如果我没弄错的话,这种方法只会改变主题中的偏移量,而不是从硬盘上删除所述记录。

有没有一个Java API可以从硬盘上删除记录?

共有3个答案

司寇祺
2023-03-14

Kafka主题是不可变的,这意味着您只能向它们添加新消息。没有删除本身。

然而,为了避免“磁盘耗尽”,Kafka提供了两个概念来降低主题的大小:保留策略和压缩。

保留如果你有一个你永远不需要数据的主题,你只需设置一个保留策略,不管你需要多长时间保留数据,即72小时。Kafka会自动删除超过72小时的邮件。

压缩如果您确实需要数据永远存在,或者至少长时间存在,但是您只需要最新的值,那么您可以将主题设置为压缩。一旦新消息添加了一个已经存在的密钥,这个将自动删除旧消息。

规划Kafka架构的核心部分是思考如何将数据存储在主题中。例如,如果你在Kafka主题中推送客户记录的更新,比如说客户的最后登录日期(非常做作的例子…),然后你只对最后一个条目感兴趣(因为之前的所有条目都不再是“最后”登录)。如果此分区的密钥是客户ID,并且启用了日志压缩,那么一旦用户登录并且kafka主题收到此事件,具有相同分区密钥(客户ID)的任何其他先前消息都将自动从主题中删除。

宣原
2023-03-14

我可以删除。如果linux在机器上,它会将其从硬盘中删除。当我在网上搜索时,我发现windows有一个漏洞。然而,我在windows中找不到解决这个错误的方法。如果kafka在linux机器上运行,则此代码有效。

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
       TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
       Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
       deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
       kafkaAdminClient.deleteRecords(deleteMap);
}
梁丘成和
2023-03-14

这让我一开始也有点困惑,为什么包括bin/kafka删除记录。sh可以删除,但我不能使用Java API

缺少的是你需要给Kafka夫未来打电话。get(),因为deleteRecords返回未来的映射

这是密码

在这段代码中,您需要调用entry.getValue(). get(). lowWatermark()

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();
 类似资料:
  • 有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。

  • 距今已过去数小时,话题仍未删除。 我看到了一些建议,建议我将放在我的中,然后重新启动Kafka。我试过这个。没奏效。 (为什么默认不设置这个?) 我可以关闭kafka和zookeeper,运行,然后再次启动zookeeper和kafka。但这是相当激烈的。确实应该有一些方法来说服实际上删除一个主题?

  • 我有几个Kafka的题目作为测试。现在我想通过清理我的Kafka主题列表来把它们全部除掉。我设置了变量,然后停止并重新启动zookeeper和kafka服务器。但什么也帮不了我。主题仍然存在,“标记为删除”。我读了这个问题,但没有找到任何答案。否则,这里建议手动移除任何主题。但我该怎么做呢?在故事的结尾,手动或通过命令行,我如何永久删除Kafka主题?

  • 我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话

  • 我们执行以下步骤以删除主题-hgpo.llo.prmt.processed 但即使在12小时后,主题文件夹仍未从/var/kafka/kafka-logs中删除 注意-我们set-delete.topic.enable=true 在/var/kafka/kafka-logs下,我们有许多主题文件夹,如: ..

  • 我想删除所有空的Kafka主题(定期从cron)。我在文档中找不到一个这样做的命令?转到脚本: 但是,这包括已经过期的消息?在不使用消费者的情况下,如何在主题中找到实际的当前计数?