当前位置: 首页 > 知识库问答 >
问题:

Flink中的会话窗口会产生意外结果

皇甫雨华
2023-03-14

我有一个由两个字段“键控”的记录流,然后分配一个间隔为30秒的会话窗口。我使用附加在记录上的“时间戳”作为事件时间。我正在使用“Assign AscendingTimeStamps”水印。

以下面的记录为例。该流由(用户,place)键控。

Record1:user1,place1,timestamp t1

Record2:user2,place1,timestamp在t1之后30秒

桶1

Record1:user1,place1,timestamp t1

Record3:user1,place1,timestamp在t1的30秒内

共有1个答案

茅炯
2023-03-14

问题是assignascendingtimestamps要求您的时间戳在所有键上单调增加。这是因为Flink不能生成每个密钥的水印。

由于Flink不能生成每个密钥水印,因此必须生成水印,使它们对所有元素都有效。如果每个键的时间戳都是单调的,但不是所有键的时间戳都是单调的,那么您必须定义两个键之间的最大无序度(时间戳的差异)。通过从元素的时间戳中减去这种无序性,您将获得一个有效的水印。另请参见BoundedoutoFordernesTimeStampExtractor。然而,请注意,如果元素到达时出现了较大的无序,那么它也会中断。

 类似资料:
  • 我有一个聚合函数,它计算WindowedStream中一系列事件的平均值。 这里的警告是,平均值需要在可能无序(或根本没有)到达的事件对上计算。 换句话说,我需要在计算之前对数据进行排序,因为序列很重要。 我可以用getResult API来实现这一点,但是这个函数在窗口中的每个事件上都被调用,这在性能方面没有意义。我也可以用flink cep来做这件事,但出于同样的原因,我想避免使用它。 理想情

  • 对于一个类,我得到了一个由base64编码的salted sha-256散列密码组成的文件。 文件的格式为: 用户名:base64编码sha256密码:salt 我最初的想法是用base64对哈希进行解码,这样我就可以得到: 用户名:salted散列密码:salt 然后通过JTR或hashcat运行它来破解密码。 我的问题是在base64解码过程中。 我的代码看起来像: /*备选方案#1:为散列*

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • Flink中的会话窗口在prod env上没有按预期工作(相同的逻辑在本地env上工作)。这个想法是为特定的用户ID发出“sample_event_two”的计数 尽管集合中存在sample_event_one(通过验证日志消息“已接收sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“未找到 sampleOneEvent 事件,而不

  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点