当前位置: 首页 > 知识库问答 >
问题:

如何阅读Kafka主题中的所有记录

云宏儒
2023-03-14

我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。

我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为最早)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。

我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除了在某个地方保留另一个记录,告诉我有多少消息在主题中需要阅读之前,我停下来。

有什么好主意吗?

共有1个答案

商开济
2023-03-14

根据我的理解,您试图实现的是在您的应用程序中构建一个基于特定主题中已经存在的值的映射。

对于此任务,您可以使用Kafka Streams DSL中的Ktable,而不是手动轮询主题,它将自动构造一个可读的键值存储,该存储具有容错、启用复制并自动填充新值。

您只需在流上调用groupByKey,然后使用聚合即可完成此操作。

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);

点击此处阅读更多关于Kafaka流概念的内容

然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止

Kafaka是一个消息流平台。您流式传输的任何html" target="_blank">数据都在持续更新,您可能不应该以期望在一定数量的消息之后停止消费的方式使用它。如果在您阻止消费者后有新消息进来,您将如何处理?

您在这里的具体用例是什么?可能有一个很好的方法来使用Kafaka语义本身。

 类似资料:
  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头

  • 我们从kafka向SparkStreaming发送了15张唱片,但是spark只收到了11张唱片。我用的是spark 2.1.0和kafka_2.12-0.10.2.0。 密码 bin/Kafka-console-producer . sh-broker-list localhost:9092-topic input data topic # 1 2 3 4 5 6 7 8 9 10 11 12

  • 你好,我正在写一个服务围棋和Kafka,我需要实现一个删除所有endpoint,将删除所有记录从一个特定的主题。然而,我找不到一个合适的方法来做到这一点。我使用Sarama库为Kafka。 到目前为止,我能找到实现删除所有的唯一两种方法是删除主题,这似乎不是处理这个问题的有效方法,第二种方法是使用Sarama库中的函数,但是这函数删除偏移量小于相应分区给定偏移量的记录。这意味着我必须先得到最新的偏

  • 我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。

  • 我有两个不同的主题要阅读,并将处理后的数据发布到WebService。我有一个条件,我必须完全读取来自topic1的消息,并确保如果没有来自topic1的消息,我必须读取来自topic2的消息并处理它。如果我开始从topic2读取消息并从topic1获取消息,我必须暂停处理来自topic2的消息并从topic1读取消息。 我设法使用KafkalistenerEndpointRegistry实现了这