当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka流式KTable更改日志

马泰
2023-03-14

我正在使用Apache Kafka streaming对从Kafka主题中消耗的数据进行聚合。然后,聚合被序列化到另一个主题,它本身被使用,结果存储在一个DB中。我想是很经典的用例吧。

聚合调用的结果是创建一个由Kafka变更日志“主题”备份的KTable。

 KTable<String, Record> countAndSum = groupedByKeyStream.aggregate(...)

这实际上是很好的/必要的,因为这避免了当将来的事件带有相同的键时丢失我的聚合状态。

然而,从长远来看,这意味着这个变更日志将永远增长(随着更多的键进入)?而且我可能有很多键(我的聚合不像count/sum那么小)。

由于我有一种方法知道我不会再获得特定键的事件(一些事件被标记为“final”),有没有一种方法让我去除更改日志中这些特定键的聚合状态,以避免它永远增长,因为我不再需要它们了,可能会有一个轻微的延迟“只是”以防万一?

或者也许有一种完全不同于Kafka流媒体的方法来避免这个“问题”?

共有1个答案

贺靖
2023-03-14

是:changelog主题配置了日志压缩,而不是保留时间。如果您接收到“final”记录,您的聚合只能返回null作为聚合结果。这将从本地RocksDB存储区以及基础的changelog主题中删除它。

 类似资料:
  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里

  • 我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。 KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。 所以不需要为我的KTable存储启用日志记录(更改日志)? 我的源输入日志压缩主题可能有数百万条记录,所以如果我在KT

  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 问题内容: 我试图改变对象的格式,我试图以这种方式做到这一点: 但这对对象没有任何影响,仍然是旧格式,无法真正理解为什么会这样。 问题答案: 请尝试将两个概念区分开:您的数据和向用户呈现的数据(或用于其他目的的格式设置,例如包含在JSON中)。的保持值7可以表示为(格式化成)7,07,007或7,同时仍保持刚好相同的值而没有任何格式化信息- 外的格式化谎言。同样,a 具有一个时间点,它可以表示为(