当前位置: 首页 > 知识库问答 >
问题:

Kafka从主题中删除记录时不使用偏移量,而是按记录的一个字段

甄志
2023-03-14

假设我有一个名为“批处理”的主题,有一个分区,我向它发布了数百万条记录以供处理。我有一个由3人组成的消费者小组来处理这些数以百万计的记录。我遇到了这样一种情况:我不再需要处理满足特定标准(如age)的特定消息子集

如何以编程方式从主题中删除这些消息。就像我在用户界面中点击一个“取消”按钮,它应该从主题中删除那些记录的子集

我知道我可以通过运行带有偏移量的命令行来删除消息:-https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

还有Java API,但同样是通过偏移量:

https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html#deleteRecords-爪哇。util。地图组织。阿帕奇。Kafka。客户。管理删除记录-

删除偏移量小于相应分区给定偏移量的记录

但是在我的情况下,我不能使用偏移量,因为我只需要删除某些记录,而不是所有小于给定偏移量的记录


共有2个答案

燕扬
2023-03-14

你不能,Kafka不是设计用来像数据库一样使用的,它实际上是一个不可变的提交日志。删除记录工具主要用于管理任务。

有一个例外,那就是如果使用日志压缩。如果你有一个压缩的主题,你可以通过向主题发布一条带有NULL值的记录来删除键的值。压缩主题通常像数据库提交日志一样使用,您可以将它们读入某个下游服务,在那里它像表一样具体化。NULL值应解析为记录删除。

因此,在您的用例中,您可以将主题具体化为一个系统,该系统针对SELECT key FROM TABLE WHERE age这样的查询进行了优化

柳宪
2023-03-14

我需要指出的主要事情是,您不应该将Kafka中的数据视为数据库中的数据。Kafka没有被设计成以这样的方式工作(例如:当我点击X按钮时,Y记录将被删除)。

相反,您应该将主题视为永无止境的数据流。为Kafka主题生成的每个记录都将由消费者独立消费和处理。

将主题视为流会为您提供不同的解决方案:

您可以使用第二个主题,其中包含过滤结果!

Streaming Diagram
                            ___ Topic A ____
--  Produced Messages -->  |                |      _______________________
                           |________________| --> |                       |
                                                  | Filtering Application |
                            ___  Topic B ___      |                       |
                           |                | <-- |_______________________|
<-- Consumed Messages --   |________________|
 类似资料:
  • 有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。

  • 我们有一个Kafka流聚合拓扑。我们需要控制changeLog主题的大小,以减少Kafka存储成本。因此,我们在拓扑中使用transformer(DSL API)调度标点符号,该标点符号使用KeyValueStore.delete()从stateStore中删除旧记录。 我能够验证在删除之后,在进一步调度的标点符号触发器中,删除的键不在状态存储中。但是它是否也会从changeLog主题中删除该记录

  • 我正在寻找一种从Kafka主题中删除(完全删除)已消费记录的方法。我知道有几种方法可以做到这一点,例如更改主题的保留时间或删除Kafka logs文件夹。但我要寻找的是一种使用Java API删除某个主题的一定数量记录的方法,如果可能的话。 我试过测试AdminClient API,特别是AdminClient。deleteRecords(recordsToDelete)方法。但如果我没弄错的话,

  • 你好,我正在写一个服务围棋和Kafka,我需要实现一个删除所有endpoint,将删除所有记录从一个特定的主题。然而,我找不到一个合适的方法来做到这一点。我使用Sarama库为Kafka。 到目前为止,我能找到实现删除所有的唯一两种方法是删除主题,这似乎不是处理这个问题的有效方法,第二种方法是使用Sarama库中的函数,但是这函数删除偏移量小于相应分区给定偏移量的记录。这意味着我必须先得到最新的偏

  • 我是流媒体代理(如Kafka)的新手,来自排队消息系统(如JMS、Rabbit MQ)。 我从Kafka文档中读到,消息作为记录存储在Kafka分区的偏移量中。消费者从偏移量读取。 消息和记录有什么区别[多个/部分消息是否构成记录?] 当消费者从偏移量读取时,消费者是否有可能读取部分消息?消费者是否需要基于某种逻辑将这些对等消息串起来? 或 1条消息=1条记录=1个偏移量 之所以会出现这个问题,是

  • 我试图创建一个Spring引导应用程序(java),它必须能够通过给它的偏移数和分区从Kafka主题删除消息。我一直在研究java或Spring Boot包类可以做到这一点,但我只发现了这样的东西:从Apache Kafka主题中删除消息有一个java kafka客户端有一个方法来删除所有消息之前一个偏移,但我只是一个删除一个。这是可能的吗? 提前感谢