当前位置: 首页 > 知识库问答 >
问题:

有没有可能到达后的消费者Kafka信息?

鲁望
2023-03-14

我想从Kafka的主题消费事件后,他们到达的时间。我希望使用事件的时间在消息的有效负载中。在Kafka那里有可能实现那样的事情吗?它的缺点是什么?

实际示例:一条消息M在12:10产生,在12:11到达我的Kafka主题,我希望消费者在12:41(到达后30分钟)轮询它

共有1个答案

古棋
2023-03-14

Kafaka的所有主题的默认保留期为7天。因此,您可以随时消耗最多一周的数据,如果您经常这样做,缺点是网络饱和。

如果您希望使用不在最新偏移量的数据,那么对于任何新的使用者组,您都可以设置auto.offset.reset=aristy。否则,对于现有的组,您需要使用kafka-consumer-groups--reset命令来重新使用已经使用的记录。

有时您可能希望从主题的开头开始,例如,如果您有一个压缩的主题,以便在主题中重建数据的“增量”--查找“流/表对偶”(stream/Table duality)。

我希望使用事件的时间在消息的有效负载中

因为KIP-32每个消息都有一个有效负载之外的时间戳,顺便说一句

我想让消费者投票...(到达后30分钟)

 类似资料:
  • 我有一个kafka主题,3个分区,只有一个带批处理的消费者。我在消费者方面使用的是spring kafka和以下消费者道具: 即使队列中有数千条消息(GBs数据)在等待,kafka consumer在每次轮询中也会收到大约10条消息(总大小约为1MB)。使用者应该获取(在我的示例中为15MB)或(在我的示例中为10000)的批处理。有什么问题?

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。