当前位置: 首页 > 知识库问答 >
问题:

如何使用kafka流保持kafka主题中关键字的N个最新值

闾丘选
2023-03-14

假设我使用kafka streams(kafka-streams-scala库,版本2.2.0)。

我想出了一种可能的方法:创建流和可变映射,然后使用stream.foreach来跟踪每个键的N个最近值。

val stream: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topicName")

val map = scala.collection.mutable.Map[String, List[MyObject]]

stream.foreach((k, v) =>  {
  //update map
})

我的问题是是否有更好的方法来实现这一点--或者使用streams API,或者至少不使用可变映射。

共有1个答案

仲君昊
2023-03-14

所以我需要像KTable或GlobalKTable这样的东西,但它们只保留一个值。

继续使用ktable(或globalktable),但使用结构化值和/或集合作为“值”。Kafka中没有强制要求您将消息值仅限制为基元数据类型(如integerstring)。

想一想:kstream > list。在这里,每个消息都属于一个特定的用户(由userid键标识),并且每个消息都有与该用户关联的0个、1个或多个ClickEvent的列表。这“只是工作”,您只需要为您想要使用的数据类型拥有适当的serdes(序列化器/反序列化器)。

例如,https://github.com/confluentinc/kafka-streams-examples中的CustomStreamTableJoin示例(直接链接到Apache Kafka V2.2的V5.2.1示例)使用pair类在Kafka的消息值中存储一个元组,并且它附带了pairserde。对于存储值的集合也可以这样做(开发人员正在这样做),比如列表 ,正如您在自己的用例中提到的那样。

我需要保留几个最近的key值在kafka主题使用kafka流。[...]我想出了一种可能的方法:创建流和可变映射,[...]

您不需要使用映射。Kafka消息中已经可以使用该键,因此您只需要为消息值提供一个类似列表的数据类型。

或者至少没有可变映射。

您不需要(也不应该)使用可变的数据结构,除非有特定的原因,我认为您的用例中没有这种原因。当正在处理一个新消息并且相应的输出存储在ktable中时,表中为该键存储的任何内容都将被覆盖--因此使用不可变的数据结构作为消息值是完全可以的。

 类似资料:
  • 我正在将日志消息写入Kafka Topic,我希望此主题的保留是永久的。我在Kafka和Kafka Connect(_schemas、连接-配置、连接-状态、连接-偏移等)中看到,有一些特殊主题不会因日志保留时间而删除。如何强制一个主题像这些其他特殊主题一样?是命名约定还是其他属性? 谢啦

  • 我正在尝试每 提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。 我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?

  • 我只是Kafka的新手,我有个问题: 我在Kafka中有一个主题“A”,我启动Spring boot应用程序并使用MessageChannel向主题“A”发送一些消息,然后我停止应用程序。 当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(并非所有消息)?我搜索了所有的解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果你有可运行的代码,请分享,我非常感谢:

  • 假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?

  • 配置的application.yaml如下所示。这个想法是我有3个输入和3个输出主题。该组件从input topic获取输入,并将输出提供给OutputTopic。 引发的异常为 谁能帮助我与Kafka Streams Spring-Kafka代码样本处理与多个输入和输出主题。 更新:2021年1月21日