当前位置: 首页 > 知识库问答 >
问题:

timeWindow不考虑事件时间

童浩言
2023-03-14

我正在尝试Flink对从CSV文件加载的(排序的)时间戳事件进行基本聚合。

我告诉Flink使用活动时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

然后我在KeyedStream上使用一个时间窗口

val distances = signals
  .assignAscendingTimestamps(_.ts)
  .map(s => (s.mmsi, s.ts, getPortDistance(s)))
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .sum(2).print()

问题是,将窗口更改为10分钟实际上会在该时间过后打印结果!

我的理解是,通过明确告诉Flink使用时间戳字段作为事件时间,操作将不依赖于机器上的实时性。我错过什么了吗?

共有1个答案

卫嘉谊
2023-03-14

首先,您必须了解水印以及如何生成水印。

什么是水印?

一般来说,水印是一种声明,表明到流中的那个点,达到某个时间戳的所有事件都应该已经到达。一旦水印到达操作员,操作员可以将其内部事件时间时钟提前到水印的值。有关更多详细信息,请查看官方文档。

如何生成水印?

因为您调用了assignAscendingTimestamps函数,这意味着您的水印是(最新接收元素的时间戳-1)。因此,您将获得升序水印,无法检索无序元素。

如何解决这个问题?

定义自己的水印时间戳。您可以查看“assignAscendingTimestamps”的详细实现,并尝试编写自己的。

 类似资料:
  • 我观察到每次发出“编译”任务时,SBT都会编译所有源文件,而不管上次编译后的时间戳如何。以下是我的build.sbt文件: 以下是我的项目结构(忽略的项目和目标目录): 出于测试目的,这两个源文件只是空的对象定义。 当我输入“sbt编译”时,我得到了以下信息: 我可以在目标目录中找到新编译的类文件。 没有修改任何源文件,一分钟后,再次键入“sbt compile”,我得到了相同的信息和类文件,只是

  • 时间轴事件模型显示并录制触发的所有事件。使用时间轴事件参考可了解有关每个时间轴事件类型的详细信息。 通用的时间轴事件属性 某些细节存在于所有类型的事件中,而一些仅适用于某些事件类型。这个部分列出了不同事件类型的通用属性。某些事件类型的特定属性在以下事件类型的引用中列出。 属性 显示详情 Aggregated time 聚合时间 对于具有嵌套事件的事件,每个事件类别所花费的时间。 Call Stac

  • 问题内容: 我的名字和姓氏后面都有空格。我有两个SQL查询-即使我没有空格搜索,第一个查询也会返回结果。(第一个查询返回了不想要的结果)。 此行为在所有版本的SQL Server中都一致吗? 这是已知行为吗?难道在任何地方记录的? 代码 问题答案: 请参阅http://support.microsoft.com/kb/316626 SQL Server遵循有关如何将字符串与空格进行比较的ANSI

  • 我在Git中有两个主要分支:和。 我的分支的结构如下: 在我执行合并之前,和在提交、和的master中有一个共同的父级 ,我删除了一些文件(比如和)当它们仍然存在于分支中的提交中时,它们不正确。 因此,当我通过创建提交来执行三方合并以加入和时,不再包含foo和bar!Git没有以任何方式通知我他们失踪的消息。 在我看来,因为,和只是在提交之后重新播放,因此和都消失了。 Git合并的这种行为对我来说

  • 有没有一个时区,我可以使用,以考虑到夏令时?我知道'EDT'是指夏令时,而'EST'不是。是否有一个时区我可以使用,将自动检测日期时间是否是夏令时?“ET”或“America/New_York”能行吗? 编辑:: 抱歉应该更清楚。我想取一个日期并将其转换为东部时区,但我希望它考虑到东部时区的日期是否为夏令时。

  • 有时,一个任务通知另一个异步执行的任务发生了特定的事件很有用,因为第二个任务要等到特定事件发生之后才能继续执行。事件也许是数据已经初始化,也许是计算阶段已经完成,或者检测到重要的传感器值。这种情况,什么是线程间通信的最佳方案? 一个明显的方案就是使用条件变量(condvar)。如果我们将检测条件的任务称为检测任务,对条件作出反应的任务称为反应任务,策略很简单:反应任务等待一个条件变量,检测任务在事