当前位置: 首页 > 知识库问答 >
问题:

阅读Kafka留言后如何删除

秦涵映
2023-03-14

我正在使用下面的代码从一个主题读取消息。如何在阅读邮件后删除它?

from kafka import KafkaConsumer


    consumer = KafkaConsumer('my-topic',
                             group_id='my-group',
                             bootstrap_servers=['localhost:9092'])
    for message in consumer:
        # message value and key are raw bytes -- decode if necessary!
        # e.g., for unicode: `message.value.decode('utf-8')`
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

共有1个答案

湛嘉歆
2023-03-14

没有办法从Kafka身上删除一条特定的信息--Kafka根本就不是设计来做这件事的。删除消息的唯一方法是将Kafka的config/server.properties中的log.retention.hours设置为您喜欢的值。默认值为168-意味着消息在168小时后不再保留。

如果您正在寻找一种从特定偏移量读取消息的方法--即不是每次都从头读取,请查看以下内容:http://kafka-python.readthedocs.org/en/master/apidoc/kafkaconsumer.html
commit()-将读取的偏移量提交到kafka
seek_to_end()-快进到只使用新到达的消息
seek()-移动到给定的偏移量(可能存储在kafka以外的其他地方)

 类似资料:
  • 有没有什么方法可以让我查看针对给定主题发送给Kafka的消息内容?说一些事情,比如查看这个主题的最后5条消息,如果可能的话。

  • 我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头

  • 第一,我试过了 第二,我尝试了下面的。(使用onErrorResumeNext),但取消了订阅。 (未调用onError,但调用OnComplete。因此已取消订阅) 第三,我试了下面。(带重试) 这比第一好。但没有刻录。 我想使刷新按钮,工作后错误。 我想知道 null 对不起,我的英语太差了。

  • 欢迎来到Go的世界,让我们开始探索吧! Go是一种新的语言,一种并发的、带垃圾回收的、快速编译的语言。它具有以下特点: 它可以在一台计算机上用几秒钟的时间编译一个大型的Go程序。 Go为软件构造提供了一种模型,它使依赖分析更加容易,且避免了大部分C风格include文件与库的开头。 Go是静态类型的语言,它的类型系统没有层级。因此用户不需要在定义类型之间的关系上花费时间,这样感觉起来比典型的面向对

  • 我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。 我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。 我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除