当前位置: 首页 > 知识库问答 >
问题:

窗口触发器不返回最新的结果

劳华灿
2023-03-14

我正在尝试测量具有窗口操作的 Flink 应用程序的延迟,如下所示:

SingleOutputStreamOperator<String> branch = stream
                .getSideOutput(outputTag2)
                .keyBy(MetricObject::getRootAssetId)
                .window(TumblingEventTimeWindows.of(Time.seconds(60)))
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(15)))
                .aggregate(new CountDistinctAggregate(), new CountDistinctProcess())
                .name("windowed-count-distinct")
                .uid("windowed-count-distinct")
                .map((value)->String.valueOf(value.getTimestamp().toEpochMilli()))
                .name("send-timestamp");

我正在考虑事件时间并提取时间戳,我使用这个水印策略:

                        .<SingleRecord>forBoundedOutOfOrderness(Duration.ofSeconds(15))
                        .withTimestampAssigner((event, timestamp) -> event.getTimestamp().toEpochMilli()))

聚合函数将特定对象保存为累加器,其中还包含提取的时间戳;这些时间戳写在 kafka 主题中。问题是返回的时间戳如下:

1639651859988
1639651890163
1639651904900
1639651919728
1639651919728
1639651949973
1639651965085
1639651979870

返回的时间戳并不像我预期的那样间隔相等,第四个和第五个是相等的,但它们的返回间隔为15秒,这是不可能的,因为应用程序记录的输入每秒(每秒10个)连续生成。在其他测试中,我也遇到了更糟糕的情况,比如:

1639651979870
1639651992771
1639651992771
1639651992771
1639651992771
1639652189791
1639652205001
1639652219876

奇怪的事实是,当我使用一个简单的翻滚窗口而不触发时:

 .window(TumblingEventTimeWindows.of(Time.seconds(15)))

返回的时间戳按预期等距分布:

1639652429766
1639652444930
1639652459900
1639652474609
1639652489746
1639652504862
1639652519734
1639652534847

我真的不明白这是问题所在,似乎聚合函数中的累加器没有正确升级。

共有1个答案

江阳冰
2023-03-14

我认为您可以检查Flink streaming的输入数据,以验证结果是否符合预期。

对于第一个流式传输,聚合操作在60秒窗口的数据集中运行4次(每15秒)。不确定您在聚合中的逻辑。例如。假设我们有一个3秒的窗口,触发器是每1秒一次。运算符是在窗口中获取max元素。还假设输入每1秒生成一次。如果输入是1、3、2、...,那么我们将看到Flink的输出,如1、3、3...,因为第一个窗口在其窗格中有[1、3、2],对于每个获取max元素的触发器,结果将是1、3、3。

对于第二个流式作业,每个窗口都有一个触发时间,因此,例如,使用上面的输入作为例子,如果窗口是1秒,我们将得到1、3、2、...

 类似资料:
  • 我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。

  • 我有这个脚本: HTML中的用法: 由于某些原因,它的工作,如果页面重新加载一半,但不,它没有开火,在它工作之前,所以我不知道发生了什么。我在wordpress网站上使用这个。

  • 我想记录从火花结构化流的传入流读取到数据库的记录数。我正在使用ForeachBatch来转换传入的流批处理和写入所需的位置。我想日志0记录读取,如果在一个特定的小时没有记录。但是当没有流时,Foreach批处理不会执行。有人能帮我吗?我的代码如下:

  • 我使用Flink SQL计算基于事件时间的窗口分析。在我的数据源每天晚上空闲之前,一切都正常工作,之后直到第二天数据再次开始流动时才产生最后一分钟的结果。 我已尝试将<code>设置为table.exec.source。空闲超时,但没有帮助。我能做什么?

  • null 然而,当我运行这个管道时,我甚至在通过窗口结束之前就得到了早期结果 这可能是什么原因?

  • 当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?