当前位置: 首页 > 知识库问答 >
问题:

有没有一种方法可以从一个主题中删除所有的数据,或者在每次运行之前删除主题?

易阳云
2023-03-14

有没有一种方法可以从一个主题中删除所有的数据,或者在每次运行之前删除主题?

我可以修改kafkaconfig.scala文件以更改logretentionhours属性吗?有没有一种方法可以在消费者阅读后立即删除这些消息?

我正在使用生产者从某个地方获取数据,并将数据发送到消费者消费的特定主题,我可以在每次运行时删除该主题中的所有数据吗?我只想在主题中每次都有新的数据。有什么方法可以重新初始化主题吗?

共有1个答案

南门朗
2023-03-14

<罢工> 还不认为它得到支持。 看看这个JIRA问题“添加删除主题支持”。

手动删除:

  1. 关闭群集
  2. 清除kafka日志目录(由kafka配置文件中的log.dir属性指定)以及zookeeper数据
  3. 重新启动群集
    null

另外,是否有一种方法,当用户阅读消息时,消息就会被删除?

从Kafaka的文件:

Kafka集群在一段可配置的时间内保留所有已发布的消息(无论它们是否已被使用)。例如,如果将日志保留设置为两天,那么在消息发布后的两天内,该消息可供使用,之后将丢弃该消息以释放空间。Kafka的性能相对于数据大小是有效恒定的,因此保留大量数据不是问题。

您还可以在那里找到用于管理消费者端的偏移量的示例代码。

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
 类似资料:
  • 在正确配置日志保留期后,kafka主题数据不会在windows中被删除。 以下错误记录在Kafka服务器日志中。 原因:java.nio.file.文件系统异常: \tmp\kafka-logs\TopicName-2\0000000000000007262.log - 计划任务“Kafka日志保留”(Kafka.utils.Kafka调度程序)中未捕获的异常

  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 你好,我正在写一个服务围棋和Kafka,我需要实现一个删除所有endpoint,将删除所有记录从一个特定的主题。然而,我找不到一个合适的方法来做到这一点。我使用Sarama库为Kafka。 到目前为止,我能找到实现删除所有的唯一两种方法是删除主题,这似乎不是处理这个问题的有效方法,第二种方法是使用Sarama库中的函数,但是这函数删除偏移量小于相应分区给定偏移量的记录。这意味着我必须先得到最新的偏

  • 我有两个表,一个有复合主键,另一个有外键。它们都设置为删除时级联。问题是,当我从主表中删除一个复合键集“name:John date:02.02.2018”时,所有John行都会从带有外键的表中删除,但是可以有一个集合“name:John date:04.04.2018”以及日期为02.02的所有行。也删除了一些行,如何才能使其删除仅匹配外键集的行? 更新:

  • 问题内容: 我希望用户输入时间,例如12:00,但是我需要弄清楚一些事情,我实在是太可惜了。 我可以将文字限制为5个字符吗? 我可以在代码中嵌入冒号,以便用户无法删除它吗? 最后,我可以接受该代码并验证它只是数字(当然忽略了冒号) 问题答案: 答案是使用JFormattedTextField和MaskFormatter。 例如: Java编译器将要求您在创建MaskFormatter时捕获或抛出P

  • 本地gradle缓存存储Maven/gradle依赖项的副本。如何清除分级缓存?介绍如何清除整个缓存,但不清除单个包。 有没有一种简单的方法可以从本地gradle缓存中删除一个包?例如,在积极开发库时,这将是有用的。要测试一个小的库更改,我当前必须从文件系统中清除整个缓存,这样就不会使用库的旧缓存版本。