当前位置: 首页 > 知识库问答 >
问题:

删除Apache kafka中的主题消息

秦鸿羽
2023-03-14

我正在测试Kafka主题的工作,但我不明白删除是如何工作的。

我创建了一个简单的主题

retention.ms = 60000
segment.ms = 60000
cleanup.policy=delete.

在此之后,我创建了一个生产者,并发送了一些消息。消费者没有问题地接收到消息。但我预计,一分钟后,如果重复了消费者,它不会显示消息,因为它们肯定已经被删除了。但这种行为不会发生。

如果我用ksql创建一个查询,那也是一样的。消息总是会出现。

我想我不明白删除是怎么回事。

./kafka-topics --create --zookeeper localhost:2181 --topic test -- 
  replication-factor 2 --partitions 1 --config "cleanup.policy=delete" -- 
  config "delete.retention.ms=60000" --config "segment.ms=60000"
./kafka-avro-console-producer --broker-list broker:29092 --topic test-- 
  property parse.key=true --property key.schema='{"type":"long"}' --property 
  "key.separator=:" --property value.schema='{"type": "record","name": 
  "ppp","namespace": "test.topic","fields": [{"name": "id","type": "long"}]}'

3)制作人留言

1:{"id": 1}
 2:{"id": 2}
 4:{"id": 4}
 5:{"id": 5}

4)消费者

  ./kafka-avro-console-consumer \
    --bootstrap-server broker:29092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic test--from-beginning --property print.key=true

消费者会显示这四条消息。

当邮件被实际删除时?

共有1个答案

壤驷凯
2023-03-14

使用kafka-topics--zookeeper localhost:2181-alter-topic test--config retention.ms=60000更改Ajay Srivastava上面提到的retention.ms并再次测试。

 类似资料:
  • 因此,我是使用Apache Kafka的新手,我正在尝试创建一个简单的应用程序,以便我可以尝试更好地理解API。我知道这个问题在这里被问了很多,但是我该如何清除存储在主题上的消息/记录? 我看到的大部分回答都是说更改消息保留时间或者删除

  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 在kafka中,我在中将保留策略设置为3天 主题将设置为(48h)。 但是,文件夹/tmp/kafka-logs中仍然有旧数据,并且没有任何数据被删除。我等了几个小时才换了那些房产。

  • 向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?

  • 但是,这将导致以下消息: 如何删除此主题?

  • 我想删除所有空的Kafka主题(定期从cron)。我在文档中找不到一个这样做的命令?转到脚本: 但是,这包括已经过期的消息?在不使用消费者的情况下,如何在主题中找到实际的当前计数?