当前位置: 首页 > 知识库问答 >
问题:

如何消费来自Kafka主题的特定消息

翟奇逸
2023-03-14

有人能帮我弄清楚这件事吗。

谢了!

共有1个答案

裴展
2023-03-14

只要你把数据保存在Kafka(即主题保持期),你就可以随时阅读相同的主题。

因此,如果您想处理没有在Elasticsearch中索引的数据,您可以简单地重读主题并应用反向筛选器来获取所有没有在Elasticsearch中结束的消息。为此,我建议使用不同的消费者组ID。

当然,您也可以在将主题加载到Elasticsearch的同时将未加载到Elasticsearch中的消息写入新主题(即,在该日期内一次性将其放入Elasticsearch和新主题中)。但有些数据要存储两次。因此,对于以后处理非索引数据,您有一个空间/时间权衡:假设90%的数据都在Elasticsearch中结束,那么复制10%的数据并在以后加快处理这些数据可能是值得的(您只需要阅读10倍小的新主题)。如果只有10%的数据被索引,那么重复90%的数据似乎是一种浪费,只节省了10%的读取开销。

 类似资料:
  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 我必须记录消费者在SpringKafka中花费的时间。由于kafkaListener方法对每条消息都执行,因此在那里放置一个记录器是行不通的。此外,有时一些信息会丢失,而不是被消费者消费掉。我应该把记录器放在哪里,以找出消费者启动后的弹性时间。使用者不会退出或关闭,其轮询将无限期进行

  • 如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。

  • D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......

  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml