当前位置: 首页 > 知识库问答 >
问题:

kafka流中的聚合和状态存储保持

凌昕
2023-03-14

我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

有没有更好的方法使用DSL来编写这个逻辑?

上面代码中关于聚合创建的状态存储的两个问题。

    null

提前道谢!

共有1个答案

狄楷
2023-03-14

>

  • 默认情况下,将使用持久的RocksDB存储。如果要使用内存中的存储,可以传入materialized.as(stores.InMemoryKeyValueStore(...))

    如果您有无限数量的唯一键,您最终将耗尽主存或磁盘,您的应用程序将死亡。根据您的语义,您可以通过使用带有大“gap”参数的会话窗口聚合来获得“TTL”,而不是使用过期的旧密钥。

    在处理新数据之前,该状态将始终被还原。如果您使用内存存储,则会通过使用基础的changelog主题来实现这一点。根据您所在州的大小,这可能需要一段时间。如果使用持久的RocksDB存储,状态将从磁盘加载,因此不需要还原,处理应该立即发生。只有在松开本地磁盘上的状态时,才会在这种情况下从changelog主题进行还原。

  •  类似资料:
    • 我有一个KStream,其中包含从主题到1的数据,如下所示: 和KTable,构造如下: 稍后,主题To2中出现以下消息: 现在,我希望我的KTable能够反映这些变化,并且看起来像这样: 但看起来是这样的: 我想我缩小了范围:显然聚合的只在第一次调用--之后聚合总是接收作为最后一个参数,例如。 其中,在第一次调用(通过初始值设定项创建)时为,但在第二次调用时为。 有什么想法吗? 编辑2 编辑3

    • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

    • 我用状态存储构建了一个kafka流媒体应用程序。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,Kafka会随机拆分分区和状态存储。 例如: Instance1获取:分区-0,分区-1 Instance2获取:partition-2,stateStore-repartition-0 Instance3获取:stateStore-重新分区-1,stateStore-重新分区-2

    • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

    • 下面是一个简单的场景: 用户单击“Create order”:将创建一个订单(首先保持其状态=NEW) 用户完成订单填写后,单击SAVE-->state is now Submited 当另一个检查订单并验证它时,必须进行一个过程。只有在调用了其他一些服务并给予许可时,才会验证该订单。 整个工作流程是: null null 谢谢

    • 我正在用Kafka和Kafka溪流作为Spring-Cloud-Stream流的一部分。在我的Kafka Streams应用程序中流动的数据在特定的时间窗口内被聚合和物化: 按照设计,正在具体化的信息也由changelog主题支持。 用解决方案更新Kafka Streams 2.0.1版不包含Materialized.WithRetention方法。对于这个特定的版本,我可以使用以下代码设置状态存