我正在测试Kafka主题的工作,但我不明白删除是如何工作的。
我创建了一个简单的主题
retention.ms = 60000
segment.ms = 60000
cleanup.policy=delete.
在此之后,我创建了一个生产者,并发送了一些消息。消费者没有问题地接收到消息。但我预计,一分钟后,如果重复了消费者,它不会显示消息,因为它们肯定已经被删除了。但这种行为不会发生。
如果我用ksql创建一个查询,那也是一样的。消息总是会出现。
我想我不明白删除是怎么回事。
./kafka-topics --create --zookeeper localhost:2181 --topic test --
replication-factor 2 --partitions 1 --config "cleanup.policy=delete" --
config "delete.retention.ms=60000" --config "segment.ms=60000"
./kafka-avro-console-producer --broker-list broker:29092 --topic test--
property parse.key=true --property key.schema='{"type":"long"}' --property
"key.separator=:" --property value.schema='{"type": "record","name":
"ppp","namespace": "test.topic","fields": [{"name": "id","type": "long"}]}'
3)制作人留言
1:{"id": 1}
2:{"id": 2}
4:{"id": 4}
5:{"id": 5}
4)消费者
./kafka-avro-console-consumer \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--topic test--from-beginning --property print.key=true
消费者会显示这四条消息。
当邮件被实际删除时?
使用kafka-topics--zookeeper localhost:2181-alter-topic test--config retention.ms=60000
更改Ajay Srivastava上面提到的retention.ms
并再次测试。
因此,我是使用Apache Kafka的新手,我正在尝试创建一个简单的应用程序,以便我可以尝试更好地理解API。我知道这个问题在这里被问了很多,但是我该如何清除存储在主题上的消息/记录? 我看到的大部分回答都是说更改消息保留时间或者删除
我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。
在kafka中,我在中将保留策略设置为3天 主题将设置为(48h)。 但是,文件夹/tmp/kafka-logs中仍然有旧数据,并且没有任何数据被删除。我等了几个小时才换了那些房产。
向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?
但是,这将导致以下消息: 如何删除此主题?
我想删除所有空的Kafka主题(定期从cron)。我在文档中找不到一个这样做的命令?转到脚本: 但是,这包括已经过期的消息?在不使用消费者的情况下,如何在主题中找到实际的当前计数?