当前位置: 首页 > 知识库问答 >
问题:

在窗口末尾是否强制清除窗口状态对象?

谷梁嘉运
2023-03-14

我正在使用window API将数据划分为1小时的窗口。在每个窗口中,我使用一个值状态为每个窗口存储一个布尔值。

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.days(1)) {
    @Override
    public long extractTimestamp(Event element) {
        return element.timestamp;
    }
})

// Partition by user
.keyBy(new KeySelector<Event, Tuple2<Long, String>>() {
    @Override
    public Tuple2<Long, String> getKey(Event value) {
        return Tuple2.of(value.userGroup, value.userName);
    }
})

.window(TumblingEventTimeWindows.of(Time.minutes(60), Time.minutes(0)))
.allowedLateness(Time.days(1))
.trigger(new WindowTrigger<>(EVENTS_THRESHOLD))
.aggregate(new WindowAggregator(), new WindowProcessor())

.print();
public class WindowProcessor extends ProcessWindowFunction<Long, String, Tuple2<Long, String>, TimeWindow> {

    private final ValueStateDescriptor<Boolean> windowAlertedDescriptor = new ValueStateDescriptor<>("windowAlerted", Boolean.class);

    @Override
    public void process(Tuple2<Long, String> key, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
        long currentDownloadsCount = elements.iterator().next();
        long windowStart = context.window().getStart();
        long windowEnd = context.window().getEnd();

        ValueState<Boolean> windowAlertedState = context.windowState().getState(windowAlertedDescriptor);
        if (BooleanUtils.isTrue(windowAlertedState.value())) {
            return;
        }

我必须调用“clear()”方法来清理窗口状态数据吗?我假设,因为Flink处理窗口创建和清除,所以它在清除窗口时也应该处理状态清理。

但是Flink文档明确表示,您应该调用clear方法来删除窗口状态https://ci.apache.org/projects/Flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-windown-state-in-processwindowfunction

共有1个答案

寿伟
2023-03-14

window API中涉及的各种类在多个位置保持状态:

  • 分配给每个窗口
  • 的流记录列表 触发器可以是有状态的(例如 CountTrigger)
  • 每窗口状态(ProcessWindowFunction.Context)
  • 全局状态(也在ProcessWindowFunction.Context)

前两个(窗口内容和触发器状态)在清除窗口时由Flink自动清除。清除窗口时,Flink还调用ProcessWindowFunction上的clear方法,并且您应该清除您当时在KeyedStateStoreWindowState()中创建的任何每个窗口状态。

另一方面,keyedStateStore globalState()的目的是记住从一个窗口到另一个窗口的内容,因此您不会清除这些内容。但是,如果您有一个无界的键空间,您应该注意为过时的键清理全局窗口状态。唯一的方法是在全局状态的状态描述符上指定状态TTL。

 类似资料:
  • 这是我的代码。我的问题如下 > 以这种方式清除状态是否正确? 这是使用keyBy的正确方法吗? //有100万个storeId

  • 我的应用程序使用了一个键控窗口,该窗口由时间戳函数键控。这意味着一旦该特定窗口被激发和处理,保持该键活动就没有用了,因为该特定键不会再次出现。因为这个用例涉及到持续扩展键,所以我想在一个键完成处理后立即清除它的状态,而不必配置计时器。 在每个键控窗口完成处理后,这是否可以在方法或方法中实现?

  • 我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期

  • 给对话框,确认信息框,或者其他内容使用模态时可以调用。为了使模态工作,你需要给模态一个 Id 来关联触发器。增加一个关闭按钮,只要增加类 .modal-close 到你的关闭按钮上。 模态的 HTML 结构 <!-- Modal Trigger --> <a class="waves-effect waves-light btn" href="#modal1">模态</a> <!-- Modal

  • 我有两条流: 测量 WhoMeasured(关于谁进行了测量的元数据) 这些是它们的案例类: 流包含大量数据。流几乎没有任何可用性。事实上,对于<code>who_measured_id</code>流中的每个<code>who_。这本质上是一个哈希表,由流填充。 在我的自定义窗口函数中 这是我的工作。现在你可能会看到,有一些东西不见了:两个流的结合。 因此,从本质上讲,这是一种查找表,当流中的新

  • 我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi