当前位置: 首页 > 知识库问答 >
问题:

Apache Flink翻滚窗口延迟结果

淳于昊然
2023-03-14

使用翻滚窗口的apache flink应用程序遇到问题。窗口大小是10秒,我希望每隔10秒有一个resultSet数据流。然而,当最新窗口的结果集总是延迟时,除非我将更多数据推送到源流。

例如,如果我在“01:33:40.0”和“01:34:00.0”之间将多条记录推送到源流,然后停止查看日志,则不会发生任何事情。

我在“01:37:XX”上再次推送一些数据,然后将在“01:33:40.0”和“01:34:00.0”之间获得窗口的结果集,这是不期望的,因为下游接收器逻辑期望及时得到结果集。

如有任何改进建议,我们将不胜感激。谢谢

以下是日志:

"log timestamp": "2019-11-15 01:37:45",
"message": "resultSet output: CLASS: 13 CNT: 1 from: 2019-11-15 01:33:40.0 to: 2019-11-15 01:34:00.0\n",

下面是代码片段:

Table resultTable = tableEnv.sqlQuery(""+
    "SELECT " +
    "  CAST (N02_001 AS VARCHAR(10)) AS RAILWAY_CLASS, " +
    "  COUNT(*) RAILWAY_CLASS_COUNT, " +
    "  TUMBLE_START(rowtime, INTERVAL '20' SECOND) as WINDOW_START, " +
    "  TUMBLE_END(rowtime, INTERVAL '20' SECOND) as WINDOW_END " +
    " FROM Inputs " +
    " GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND), CAST (N02_001 AS VARCHAR(10))");


TupleTypeInfo<Tuple4<String, Long, Timestamp, Timestamp>> tupleType = new TupleTypeInfo<>(
    Types.STRING,
    Types.LONG,
    Types.SQL_TIMESTAMP,
    Types.SQL_TIMESTAMP);

DataStream<Tuple4<String, Long, Timestamp, Timestamp>> resultSet = tableEnv.toAppendStream(resultTable, tupleType);

resultSet
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
    String output = "CLASS: " + value.f0 + " CNT: " + value.f1 + " from: " + value.f2 + " to: " + value.f3 + "\n";
    log.warn("resultSet output: " + output);
    return value;
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP));

共有1个答案

左华灿
2023-03-14

这是预期的行为,您正在使用EventTime,这意味着用于关闭窗口和跟踪应用程序中时间流的水印来自事件时间戳。这意味着如果没有事件,就不会有时间流,因此现在将生成窗口。这就是你所观察到的。

您所经历的行为很可能是因为您正在使用带有标点水印的赋值器,它会为每个事件发出时间戳和水印。如果切换到AssignerWithPeriodicWatermark,即使不存在数据,也应生成水印,并关闭

 类似资料:
  • 我正在使用翻滚窗口(5分钟)和,因为我的源代码来自Kafka。但是窗口总是运行超过5分钟。有人能建议吗?

  • 我有一个使用flink应用程序的场景,该应用程序接收以下格式的数据流: {“event\u id”:“c1s2s34”,“event\u create\u timestamp”:“2019-03-07 11:11:23”,“amount”:“104.67”} 我使用下面的滚动窗口来查找过去60秒内输入流的总和、计数和平均值。 键值。时间窗口(时间秒(60)) 然而,我如何标记聚合结果,以便我可以说

  • 在我的控制器中,我返回由另一个线程填充的延迟结果: FooController.java 页面准备者.java 显然,再次调用所有筛选器以“完成”请求。我的一个过滤器,需要访问 豆,这是请求范围的。该调用引发异常 查看我配置的异常: 但是没有区别。我该怎么解决这个问题?

  • 我有以下用例,如果有明显的解决方案,很抱歉,但我对Flink非常陌生: 谢谢

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 我希望火花流的输出在翻转窗口的末端发送到水槽,而不是在批处理间隔。 我从一个Kafka流读取并输出到另一个Kafka流。 查询和写入输出的代码如下: 当我在一分钟的窗口内为一个特定用户发送多个记录时,我希望在一分钟结束时这些事件的总数。 但我在输出Kafka流上获得了多个输出,并在其中写入了间歇聚合。 如。 我将在一分钟内发送以下7条记录,但间隔一定时间。 我得到的结果是: 可以看到,输出在同一个