假设我使用kafka streams(kafka-streams-scala库,版本2.2.0)。
我想出了一种可能的方法:创建流和可变映射,然后使用stream.foreach
来跟踪每个键的N个最近值。
val stream: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topicName")
val map = scala.collection.mutable.Map[String, List[MyObject]]
stream.foreach((k, v) => {
//update map
})
我的问题是是否有更好的方法来实现这一点--或者使用streams API,或者至少不使用可变映射。
所以我需要像KTable或GlobalKTable这样的东西,但它们只保留一个值。
继续使用ktable
(或globalktable
),但使用结构化值和/或集合作为“值”。Kafka中没有强制要求您将消息值仅限制为基元数据类型(如integer
或string
)。
想一想:kstream
。在这里,每个消息都属于一个特定的用户(由userid
键标识),并且每个消息都有与该用户关联的0个、1个或多个ClickEvent
的列表。这“只是工作”,您只需要为您想要使用的数据类型拥有适当的serdes(序列化器/反序列化器)。
例如,https://github.com/confluentinc/kafka-streams-examples中的CustomStreamTableJoin
示例(直接链接到Apache Kafka V2.2的V5.2.1示例)使用pair
类在Kafka的消息值中存储一个元组,并且它附带了pairserde
。对于存储值的集合也可以这样做(开发人员正在这样做),比如列表
,正如您在自己的用例中提到的那样。
我需要保留几个最近的key值在kafka主题使用kafka流。[...]我想出了一种可能的方法:创建流和可变映射,[...]
您不需要使用映射
。Kafka消息中已经可以使用该键,因此您只需要为消息值提供一个类似列表的数据类型。
或者至少没有可变映射。
您不需要(也不应该)使用可变的数据结构,除非有特定的原因,我认为您的用例中没有这种原因。当正在处理一个新消息并且相应的输出存储在ktable
中时,表中为该键存储的任何内容都将被覆盖--因此使用不可变的数据结构作为消息值是完全可以的。
我正在将日志消息写入Kafka Topic,我希望此主题的保留是永久的。我在Kafka和Kafka Connect(_schemas、连接-配置、连接-状态、连接-偏移等)中看到,有一些特殊主题不会因日志保留时间而删除。如何强制一个主题像这些其他特殊主题一样?是命名约定还是其他属性? 谢啦
我正在尝试每 提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。 我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?
我只是Kafka的新手,我有个问题: 我在Kafka中有一个主题“A”,我启动Spring boot应用程序并使用MessageChannel向主题“A”发送一些消息,然后我停止应用程序。 当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(并非所有消息)?我搜索了所有的解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果你有可运行的代码,请分享,我非常感谢:
假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?
配置的application.yaml如下所示。这个想法是我有3个输入和3个输出主题。该组件从input topic获取输入,并将输出提供给OutputTopic。 引发的异常为 谁能帮助我与Kafka Streams Spring-Kafka代码样本处理与多个输入和输出主题。 更新:2021年1月21日