当前位置: 首页 > 知识库问答 >
问题:

KTable状态存储无限保留

幸弘光
2023-03-14

我们有以下高级DSL处理拓扑:

TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);

KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");


KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");

KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")

简而言之,我们在上面做的是:

    null

其思想是创建窗口化事件计数,并将这些窗口化键用于联接和聚合操作(在KTable的情况下,这类操作没有窗口)

问题是:join和aggregate操作的状态存储没有保留机制,并导致磁盘(RocksDB)中的空间爆炸。

更具体地说:(跳跃)窗口会在键上产生笛卡尔积,并且没有删除旧窗口的机制。

请注意,支持table1和table2的状态存储没有空间问题,这是因为管理删除旧窗口的DSL为它们提供了一个窗口存储。在联接和聚合中,我们将窗口键视为“任何旧键”,DSL也会这样做,并使用非窗口KeyValueStore。

这个问题与下列问题有关:KAFKA-4212、KAFKA-4273,汇合论坛问题

这里有什么误解的概念吗?有没有一种简单的方法来使用DSL实现这个拓扑?如果不是,有什么建议的方法来使用低层API实现它?

共有1个答案

姜育
2023-03-14

我想你可以这样做:

StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);

KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
    /* custom Transformer that set the weaker grouping key right here
       and puts the extracted component into the value before the aggregation;
       additionally (that's why we need a Transformer) get the topic name from
       context object and enrich the value accordingly (ie, third String argument in the output Tuple */);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
    timeWindow,
    /* initializer: return an empty Map;
       aggregator:
       for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
         if not, add new map entry with Pair(0,0)
       take the corresponding Pair from the Map and increase one
       counter depending on the original topic that
       is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);

示例:

假设两个输入流S1S2具有以下记录( ):

s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>  
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>  
<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>
<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>

如果这是一个正确的例子(我可能没有理解您的问题描述)。您可以使用transform/groupby/aggregate执行上述操作。投入是:

s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

transform的结果是(包括ts):

<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>

注意,transform实际上将两个流作为“一个流”处理,因为我们使用了模式订阅--因此,输出只是一个流,其中包含来自两个原始流的交错记录。

<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>

如果您将此结果的每个键的最新记录与上面的结果进行比较,您会看到两者都是相同的。

 类似资料:
  • 我目前正在考虑将opengl状态存储为某种适当类型的全局thread_local变量。那个设计有多糟糕?有什么陷阱吗?

  • 我试图更好地理解如何设置我的集群来运行我的Kafka-Stream应用程序。我正试图更好地了解将涉及的数据量。 在这方面,虽然我可以很快地看到一个KTable需要一个状态存储,但我想知道从一个主题创建一个Kstream是否意味着立即将该主题的所有日志以一种仅追加的方式复制到状态存储中。也就是说,特别是如果我们要公开流以供查询? 当源主题是Kstream时,当它们在源主题中移动时,Kafka是否自动

  • 我试图检查/保存我在EMR上运行的flink状态到AWS上的s3存储桶。请注意: 实例(主节点和核心节点)正确设置了IAM角色,以访问s3 bucket及其内部的所有目录/文件(AmazonS3FullAccess策略附加到该角色,没有任何内容覆盖它) jobmanager日志:

  • 问题内容: 广泛的讨论问题。是否已经有任何库可以让我在Java中存储应用程序的执行状态? 例如,我有一个处理文件的应用程序,现在该应用程序可能在某个时刻被迫关闭。我想存储所有已处理文件和未处理文件的信息,以及处理正在进行的阶段正在进行的流程。 是否已经有抽象此功能的库,或者我将不得不从头开始实现它? 问题答案: 似乎您正在寻找的是可以使用Java Serialization API 执行的序列化。

  • 我正在用Kafka和Kafka溪流作为Spring-Cloud-Stream流的一部分。在我的Kafka Streams应用程序中流动的数据在特定的时间窗口内被聚合和物化: 按照设计,正在具体化的信息也由changelog主题支持。 用解决方案更新Kafka Streams 2.0.1版不包含Materialized.WithRetention方法。对于这个特定的版本,我可以使用以下代码设置状态存

  • 在绘画的时候,经常会有这种情况,本来正在用绿色笔画,突然需要用红色笔画几笔,但画完了之后又要换成绿色笔。如果是在现实中作画,可以把笔蘸上不同的墨水,画了之后又蘸上之前的墨水,或者准备几只笔,要用哪只就选哪只。 在Canvas中也可以这样,不过Canvas中的画笔永远只有一只。所以,如果要更换画笔的颜色,就需要保存和恢复状态。状态其实就是画布当前属性的一个快照,包括: 图形的属性值,如strokeS