我正在使用下面的代码从一个主题读取消息。如何在阅读邮件后删除它?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
没有办法从Kafka身上删除一条特定的信息--Kafka根本就不是设计来做这件事的。删除消息的唯一方法是将Kafka的config/server.properties
中的log.retention.hours
设置为您喜欢的值。默认值为168-意味着消息在168小时后不再保留。
如果您正在寻找一种从特定偏移量读取消息的方法--即不是每次都从头读取,请查看以下内容:http://kafka-python.readthedocs.org/en/master/apidoc/kafkaconsumer.htmlcommit()
-将读取的偏移量提交到kafkaseek_to_end()
-快进到只使用新到达的消息seek()
-移动到给定的偏移量(可能存储在kafka以外的其他地方)
有没有什么方法可以让我查看针对给定主题发送给Kafka的消息内容?说一些事情,比如查看这个主题的最后5条消息,如果可能的话。
我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头
第一,我试过了 第二,我尝试了下面的。(使用onErrorResumeNext),但取消了订阅。 (未调用onError,但调用OnComplete。因此已取消订阅) 第三,我试了下面。(带重试) 这比第一好。但没有刻录。 我想使刷新按钮,工作后错误。 我想知道 null 对不起,我的英语太差了。
欢迎来到Go的世界,让我们开始探索吧! Go是一种新的语言,一种并发的、带垃圾回收的、快速编译的语言。它具有以下特点: 它可以在一台计算机上用几秒钟的时间编译一个大型的Go程序。 Go为软件构造提供了一种模型,它使依赖分析更加容易,且避免了大部分C风格include文件与库的开头。 Go是静态类型的语言,它的类型系统没有层级。因此用户不需要在定义类型之间的关系上花费时间,这样感觉起来比典型的面向对
我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。 我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。 我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除