当前位置: 首页 > 知识库问答 >
问题:

Kafka流-更新KTable上的聚合

澹台衡
2023-03-14

我有一个KTable,数据如下所示(key=>value),其中keys是客户ID,而value是包含一些客户数据的小型JSON对象:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想对这个KTable做一些聚合,基本上保留每个age_group的记录数。所需的KTable数据如下所示:

"18-24" => 3
"25-30" => 1

假设Alice属于上面的18-24组,她的生日使她进入了新的年龄组。支持第一个KTable的状态存储现在应该如下所示:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我希望得到的聚合KTable结果反映这一点。例如。

"18-24" => 2
"25-30" => 2

我可能过度概括了这里所描述的问题:

在Kafka的溪流中,没有最终聚合这回事。根据您的用例,手动去重复将是解决问题的一种方法。“

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well

我使用的拓扑看起来像:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")

现在,从最初的,空的状态来看,一切都是这样的:

compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)

现在,让我们向compacted-topic-1发送一条消息,该消息在上面以KTable的形式流化。以下是发生的情况:

compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0
    null

共有1个答案

满雨石
2023-03-14

如果您的原始ktable包含id->json数据(我们称之为dataktable),那么您应该能够通过以下方式获得所需的内容

KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");

这应该适用于Kafka的Streams API的所有版本。

更新

 类似资料:
  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 我正在使用Apache Kafka streaming对从Kafka主题中消耗的数据进行聚合。然后,聚合被序列化到另一个主题,它本身被使用,结果存储在一个DB中。我想是很经典的用例吧。 聚合调用的结果是创建一个由Kafka变更日志“主题”备份的KTable。 这实际上是很好的/必要的,因为这避免了当将来的事件带有相同的键时丢失我的聚合状态。 然而,从长远来看,这意味着这个变更日志将永远增长(随着更

  • 如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同

  • 我有一个用例,我的KTable是这样的。 KTable:orderTable 键:值 KTable:此表将位于groupBy值上,且计数列值将具有和 键:值

  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我