当前位置: 首页 > 知识库问答 >
问题:

调用deleteRecords Kafka Admin Client Java API时,不会从文件系统中删除消息

裴俊豪
2023-03-14

我试图使用Java Admin Client API的delete Records方法从我的kafka主题中删除消息。以下是我尝试的步骤


    1. I pushed 20000 records to my TEST-DELETE topic
    2. Started a console consumer and consumed all the messages
    3. Invoked my java program to delete all those 20k messages
    4. Started another console consumer with a different group id. This consumer is not receiving any of the deleted messages

当我检查文件系统时,我仍然可以看到占用磁盘空间的所有20K记录。我的意图是删除这些记录永远从文件系统太。


Topic:TEST-DELETE       PartitionCount:4        ReplicationFactor:1     Configs:cleanup.policy=delete
        Topic: TEST-DELETE    Partition: 0      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 1      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 2      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 3      Leader: 0     Replicas: 0       Isr: 0


    log.retention.hours=24
    log.retention.check.interval.ms=60000
    log.cleaner.delete.retention.ms=60000
    file.delete.delay.ms=60000
    delete.retention.ms=60000
    offsets.retention.minutes=5
    offsets.retention.check.interval.ms=60000
    log.cleaner.enable=true
    log.cleanup.policy=compact,delete

下面给出了我的删除代码


public void deleteRecords(Map<String, Map<Integer, Long>> allTopicPartions) {

        Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();

        allTopicPartions.entrySet().forEach(topicDetails -> {

            String topicName = topicDetails.getKey();
            Map<Integer, Long> value = topicDetails.getValue();

            value.entrySet().forEach(partitionDetails -> {

                if (partitionDetails.getValue() != 0) {
                    recordsToDelete.put(new TopicPartition(topicName, partitionDetails.getKey()),
                            RecordsToDelete.beforeOffset(partitionDetails.getValue()));
                }
            });
        });

        DeleteRecordsResult deleteRecords = this.client.deleteRecords(recordsToDelete);

        Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = deleteRecords.lowWatermarks();

        lowWatermarks.entrySet().forEach(entry -> {
            try {
                logger.info(entry.getKey().topic() + " " + entry.getKey().partition() + " "
                        + entry.getValue().get().lowWatermark());
            } catch (Exception ex) {

            }
        });

    }

下面给出了我的java程序的输出



2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 1 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 0 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 3 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 2 5000

我的意图是从文件系统中删除消耗的记录,因为我的kafka代理的存储空间有限。

    null

共有1个答案

曾丰茂
2023-03-14

处理此问题的建议方法是为您感兴趣的主题设置retention.ms和相关配置值。这样,您就可以定义Kafka在删除数据之前将存储数据的时间,以确保在数据从Kafk集群中删除之前,您的所有下游消费者都有机会下拉数据。

但是,如果您仍然希望强制Kafka基于字节进行删除,则有log.retention.bytesretention.bytes配置值。第一个是集群范围的设置,第二个是特定于主题的设置,默认情况下,它接受第一个设置的任何设置,但您仍然可以针对每个主题重写它。retention.bytes数字是每个分区强制执行的,因此应该将其乘以主题分区的总数。

但是,请注意,如果有一个失控的生产者突然开始生成大量数据,并且将其设置为硬字节限制,则可能会清除集群中一整天的数据,只剩下最后几分钟的数据,甚至可能在有效的使用者从集群中拉下数据之前。这就是为什么将Kafka主题设置为基于时间的保留要好得多,而不是基于字节的。

您可以在官方的Kafka文档中找到配置属性及其解释:https://Kafka.apache.org/documentation/

 类似资料:
  • 我有以下错误,詹金斯无法从工作区删除一些文件。我可以手动删除文件,但如何使其自动化?我在构建之前选中了删除工作区,并在“要删除和包含的文件模式”中插入了**/tellar.tar.gz,但仍然不会这样做。 正在删除项目工作区。。。 清洁本地目录。 hudson.util.IOException2:远程文件操作失败: /local/hudson/workspace/CITI_PATCH_LATE在h

  • 目标是为每个行添加具有修改时间的列。 鉴于 预期的 我写了一个函数来获取修改时间 修改时间:长=1580708401253 ...但它在查询中不起作用 组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段54.0中的任务0失败4次,最近的失败:阶段54.0中的任务0.3丢失(TID 408,srs-hdp-s1.dev.kontur.ru,executor 3):org。阿帕

  • 我在servicemix中使用camel 2.10.7非常成功地将文件从本地文件系统馈送到我的应用程序。 文件应保留在文件系统上,因此我使用这样的配置。 from uri=“file:../ange data/vessers?noop=true 如果我触摸/更新文件系统上的文件,这将非常有效。 唯一的问题是:如何在我的Java代码中检测到文件已被其他人或进程从文件系统中删除? 通过研究手册页面,找

  • 但我的却不是这样。他们正在处理OK,并将正在编码的视频保存到正确的S3存储桶中。但它们没有像应该的那样删除队列项。 我试过用各种方法发送报头,包括... 我在页面上没有输出,但尝试调用了ob_start();在起始和ob_end_flush()处;在发送报头之后,甚至尝试在任何处理之前一开始就直接执行报头。没有任何工作,消息仍然在飞行中,并在其可见性超时结束后重新传递。 救命啊! 多谢了。

  • 我正在使用gradle构建一个Spring Boot应用程序,我希望从war中删除文件,因为该文件将从外部加载(它运行在tomcat容器中,而不是嵌入式的)。 我查看了StackOverflow和Gradle文档,试图找出该做什么,但我不知道该绑定到哪个阶段,以及在创建war之前还是之后排除该文件。处理文件似乎也有多种方法。 我相信Maven使用作为等价物。

  • 当我更改ListViewAdapter时,我的项目将无法从ListView中清除,并且我有一些空白条目。 她是我的密码: