当前位置: 首页 > 知识库问答 >
问题:

如何更新Kafka/Kafka流中的数据?

罗甫
2023-03-14

假设有Kafka主题顺序。数据以JSON格式存储:

{
   "order_id": 1,
   "status": 1
}

Status定义订单的状态(待定-1,已完成-2)。

完成后如何在“已完成”上进行更改?

正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?

共有1个答案

邵毅
2023-03-14

如果您的订单更改了状态,则更改状态的流程应在主题中生成具有新状态的新消息。kafka streams应用程序可以对新消息做出反应,进行转换、聚合或类似操作,并在新主题中输出修改/聚合的消息。。。所以你需要一个Kafka制作人,当订单状态改变时,他会向订单主题发送一条消息。

 类似资料:
  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里

  • 我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。

  • 我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。 为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。

  • 我想用kafka流实现请求-响应模式,我使用spring boot kafka,其中添加了一些数据作为报头,命名为关联id,但是当kafka流API处理请求消息时,报头数据会丢失,无法发送到响应主题!我该怎么解决,还是用另一种方法??

  • 我的火花流应用程序从Kafka获取数据并对其进行处理。 如果应用程序失败,大量数据存储在Kafka中,并且在Spark Streaming应用程序的下一次启动时,它会崩溃,因为一次消耗了太多数据。由于我的应用程序不关心过去的数据,因此只消耗当前(最新)数据完全没关系。 我找到了“auto.reset.offest”选项,它在Spark中的行为几乎没有什么不同。如果配置了zookeeper,它会删除

  • 我是Kafka Streams的新手,我正在使用1.0.0版。我想从其中一个值为KTable设置一个新密钥。 在使用KStream时,可以使用如下方法selectKey()完成。 然而,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了KTable设计的关键?