当前位置: 首页 > 知识库问答 >
问题:

Kafka流:KTable物化

司寇研
2023-03-14

如何识别主题的KTable物化何时完成?

例如,假设KTable只有几百万行。下面的伪代码:

KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows

在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”);

跟进问题:

约束
1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同一个键发送多个条目吗。

因此,除非压缩过程尝试清除这些条目并只保留最新的条目,否则下游应用程序将消耗来自主题的相同密钥查询的所有可用条目,从而导致重复。即使压缩过程进行了某种级别的清理,在给定的时间点,当压缩过程正在追赶时,总是不可能有一些键具有多个条目。

2)ReadOnlyKeyValueStore可能允许从存储区进行受控检索,但它仍然缺乏调度检索键、值和写入主题的方法,这需要额外的编码。

可以改进API以允许受控的物化吗?

共有1个答案

欧阳鸿哲
2023-03-14

KTable物化永远不会完成,您也不能“调用”to()

当您使用Streams API时,您将一系列操作符“插在一起”。实际的方法调用,不触发任何计算,但修改操作符的DAG。

只有通过kafkastreams#start()开始计算之后,才会处理数据。请注意,在计算开始后,您指定的所有运算符都将连续并发地运行。

是的。您必须将KTable看作是一个“版本化表”,它在条目更新时随时间演变。因此,所有更新都被写入changelog主题,并作为变更记录向下游发送(注意,KTables也进行了一些缓存,以“消除”对同一键的连续更新的重复:参见https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)。

将消耗来自主题的相同密钥查询的所有可用条目,从而导致重复。

我不认为这些是“重复的”,而是更新的。是的,应用程序需要能够正确处理这些更新。

如果我们有一种方法来安排物化,那将有助于避免重复。

物化是一个连续的过程,只要输入主题中有新的输入记录可用并进行处理,就会更新KTable。因此,在任何时间点都可能有特定密钥的更新。因此,即使您完全控制何时向changelog主题和/或下游发送更新,稍后也可能会有新的更新。这就是流处理的本质。

此外,还可以减少主题中持久化的数据量(增加存储),增加网络流量,增加压缩过程的额外开销来清理它。

 类似资料:
  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。 最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇

  • 我正在使用Apache Kafka streaming对从Kafka主题中消耗的数据进行聚合。然后,聚合被序列化到另一个主题,它本身被使用,结果存储在一个DB中。我想是很经典的用例吧。 聚合调用的结果是创建一个由Kafka变更日志“主题”备份的KTable。 这实际上是很好的/必要的,因为这避免了当将来的事件带有相同的键时丢失我的聚合状态。 然而,从长远来看,这意味着这个变更日志将永远增长(随着更

  • 我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象: 我想对这个KTable做一些聚合,基本上保留每个的记录数。所需的KTable数据如下所示: 假设属于上面的组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示: 我希望得到的聚合KTable结果反映这一点。例如。 我可能过度概括了这里