当前位置: 首页 > 知识库问答 >
问题:

kafka本地状态存储/changelog中的保留时间

洪逸清
2023-03-14

我正在用Kafka和Kafka溪流作为Spring-Cloud-Stream流的一部分。在我的Kafka Streams应用程序中流动的数据在特定的时间窗口内被聚合和物化:

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
    oneHour.withLoggingEnabled(topicConfig);
    events
            .map(getStringSensorMeasurementKeyValueKeyValueMapper())
            .groupByKey()
            .windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                    (oneHour));

按照设计,正在具体化的信息也由changelog主题支持。

 ReadOnlyWindowStore<String, Double> windowStore =  queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
 WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1

用解决方案更新Kafka Streams 2.0.1版不包含Materialized.WithRetention方法。对于这个特定的版本,我可以使用以下代码设置状态存储的保留时间,这解决了我的问题:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
    timeWindows.until(retentionMs);

使我的代码编写为:

...

.groupByKey()
        .windowedBy(timeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                (oneHour));
...

共有1个答案

冯文彬
2023-03-14

对于窗口化的ktables,存在本地保留时间和changlog保留时间。您可以通过Materialized.WithRetentionTime(...)设置本地存储保留时间--默认值为24h。

对于较早的Kafka版本,本地存储保持时间是通过Windows#tool()设置的。

如果创建了一个新的应用程序,changelog主题的保留时间将与本地存储区的保留时间相同。但是,如果手动增加日志保留时间,这不会影响存储区保留时间,但需要相应地更新代码。当changelog主题已经存在时也是如此:如果更改本地存储保持时间,changelog主题配置不会自动更新。

还有一个Jira:https://issues.apache.org/Jira/browse/kafka-7591

 类似资料:
  • 我们有以下高级DSL处理拓扑: 简而言之,我们在上面做的是: null 其思想是创建窗口化事件计数,并将这些窗口化键用于联接和聚合操作(在KTable的情况下,这类操作没有窗口) 问题是:join和aggregate操作的状态存储没有保留机制,并导致磁盘(RocksDB)中的空间爆炸。 更具体地说:(跳跃)窗口会在键上产生笛卡尔积,并且没有删除旧窗口的机制。 请注意,支持table1和table2

  • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!

  • 全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。

  • 我有一个使用处理器api更新状态存储的拓扑,配置为复制因子3,ACKS=ALL 我试图找出这个changelog主题滞后的根本原因,因为我没有在这个处理器中发出任何外部请求。有对rocksdb状态存储的调用,但这些数据存储都是本地的,检索速度应该很快。 我的问题是这个变更日志主题的使用者到底是什么?

  • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

  • 我在本地机器中运行多个kafka流消费者实例(2个实例),每个实例都有自己的自定义本地存储,每个实例的名称不同。 根据文档,如果其中一个实例发生故障,则kafka必须将死实例的存储同步到活实例的存储(如果我错了,请更正我)。 我用相同的应用程序id配置了两个实例,让kafka知道这些实例属于同一个组。 当其中一个实例被杀死时,另一个(活动)实例的存储未与死实例的存储同步。我在两个商店都启用了更改日