当前位置: 首页 > 知识库问答 >
问题:

如何从Kafka主题中检索最新消息

穆宏胜
2023-03-14

我只是Kafka的新手,我有个问题:

我在Kafka中有一个主题“A”,我启动Spring boot应用程序并使用MessageChannel向主题“A”发送一些消息,然后我停止应用程序。

当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(并非所有消息)?我搜索了所有的解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果你有可运行的代码,请分享,我非常感谢:(

    // Start application


    // Get latest message in topic 'A' then do some LOGIC
    if (exist latest message) {
          //Print latest message
    }

共有1个答案

慕容文昌
2023-03-14

消费者存储其偏移量(即上次读取的位置)。重启后,他们继续从这一点读取。这种行为是故意的。

当由于某种原因(例如,它是一个新的消费群体或偏移量已过期)尚未知道此特定消费群体的偏移量时,将使用偏移量重置属性,但通常有两个选项-a)从头开始重新读取b)开始侦听新消息,忘记以前的一切。

有很多方法可以实现您所描述的内容,但它们并不简单且不推荐(一个微不足道的方法:消费者消息,同时跳过它们,直到您到达分区EOF)

也许Kafka不是解决这个问题的正确工具。

 类似资料:
  • 我试图使用kafka-node从kafka主题读取压缩消息。 问题是,最近插入的消息留在EOL上方,在插入其他消息之前无法访问。实际上,EOL和高水位偏移之间存在间隙,这会阻止读取最新消息。原因尚不清楚。 已使用创建主题 主题中产生了许多关键值。有些钥匙是一样的。 这是插入的键和值 然后请求主题键集。 有一个高水位偏移量,表示最新的值10。然而,消费者看到的偏移值只有7。不知何故,压缩阻止了消费者

  • 目前我正在做一个项目,我需要从一个表中提取最新的数据用于报告目的。下面是示例表结构:- 我使用下面的SQL查询并能够提取数据。 但问题是真正的表非常大。大约有85k行,这个查询需要一些时间。还有其他更好的方法吗。我正在使用Oracle 11g R2。请建议 这是SQLfiddle链接http://sqlfiddle.com/#!4/b3fe1/8

  • 我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话

  • 向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?

  • 如何使用Java从MongoCollection检索最近添加的文档?大多数现有参考文献描述了如何在v2中实现这一点。在v3.3中如何做到这一点? 我想这与find(Bson filter)方法有关。如何指定Bson过滤器,如何将可查找表转换为文档?

  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测