当前位置: 首页 > 知识库问答 >
问题:

时间戳的链接窗口

江航
2023-03-14

我有数据流就像

事件名,事件id,Start_time(时间戳)...

在这里,我想对最后一个带有时间戳的字段<;code>;Start_。

因此,我在flink window中看到的是.timeWindow(Time.minutes(30)),所以我猜它需要过去30分钟的事件,但不考虑开始时间

我想把数据放在start_ time在最后30分钟内的位置,然后我如何编写转换?我是否需要使用该列使用过滤器

我是Flink的新手。

谢啦

共有1个答案

岳安福
2023-03-14

你必须做两件事:

  1. 通过在 StreamExecutionEnvironment 上调用 setStreamTimeCharacteristic(TimeCharacteristic.EventTime)来启用事件时间处理。
  2. 为记录和水印分配时间戳。这是通过使用 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks 通过调用 'DataStream.assignTimestamps(yourAssigner) 来完成的。

在事件-时间模式下,Flink将根据您分配给记录的时间戳构建窗口。水印告诉Flink“你的数据的逻辑时间”。水印1000意味着不会再有时间戳小于1000的记录。

事件时处理的整个主题太复杂了,无法在这里讨论。我建议看看Apache Flink文档。

 类似资料:
  • 我有一个带有数据和时间戳的记录日志,我的Flink应用程序按时间戳升序接收记录。在某个键的第一个项到达窗口后,我想在X事件时间后关闭窗口,检查是否有足够的项到达某个条件,并为该键发出通过或失败消息。 对于Flink中的基本窗口功能,这是不可能的吗?例如,如果我希望我的窗口有30秒长,但是键的第一个项在15秒到达,最后一个项在40秒到达,似乎窗口将在30秒关闭,并且该键的记录轨迹将是分成两个窗口。在

  • 我刚刚遇到了一个非常奇怪的问题,当使用带有时间戳和水印赋值器的EventTime时,我无法从流窗口联接中获得任何结果。 我使用Kafka作为我的数据流源,并尝试了AscendingTimestampExtractor和自定义赋值器,它们实现了Flink留档中提到的Assignerwith周期水印,正如我测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用ProcessingTime和Tu

  • 我知道这是一个非常常见的问题,但我觉得我找到的答案并没有真正解决问题。我将概述我的具体用例,并对来自其他SO答案和网络的信息进行总结。 对于我正在编写的服务,数据库条目被创建并存储在移动设备和我们的网站上,需要以两种方式同步。我们目前的目标是Android和iOS,它们都使用sqlite作为关系数据库。服务器端是使用Django和MySQL在Python中实现的,但将来可能会有其他解决方案取代它。

  • 我有一个Flink程序,它接受两个流,即数据/传感器读数流和警报规则流。我正在广播规则流,并将其连接到数据流以生成动态警报。ProcessingTime的一切都很好,但EventTime却不行。我已经分配了时间戳 > 当两个流(即带有时间戳的流)同时出现时,如何使用“EventTime”生成警报 我是否也必须为我的规则流分配时间戳和水印? 因为我的规则流只有在有任何添加/修改时才会有记录。是否有任

  • 问题内容: 我可以看到例如在这里进行了几次讨论,但是我认为由于Elasticsearch中的重大更改,解决方案已过时。 我正在尝试将我在Kafka主题中的Json中的long / epoch字段转换为通过连接器推送的Elasticsearch日期类型。 当我尝试添加动态映射时,我的Kafka连接更新失败,因为我试图将两个映射应用于字段_doc和kafkaconnect。我认为这是关于版本6的重大更

  • 本文向大家介绍sqlite时间戳转时间语句(时间转时间戳),包括了sqlite时间戳转时间语句(时间转时间戳)的使用技巧和注意事项,需要的朋友参考一下 下面是具体的实现代码: