当前位置: 首页 > 知识库问答 >
问题:

Apache Flink的窗口化依赖于EventTime事件的时间戳分配

谭绍晖
2023-03-14

我是新的apache flink,并试图了解事件时间和窗口的概念是如何处理的flink。

下面是我的设想:

>

  • 我有一个程序,它以线程的形式运行,每秒创建一个包含3个字段的文件,其中第3个字段是时间戳。

    虽然每隔5秒我会在创建的新文件中输入一个旧的时间戳(可以说是t-5),但还是有一些调整。

    现在,我运行流处理作业,将上面的3个字段读入一个元组。

    现在,我定义了以下用于水印和时间戳生成的代码:

       WatermarkStrategy
      .<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(4))
      .withTimestampAssigner((event, timestamp) -> event.f2);
    
    

    然后,我使用以下代码打开上面的窗口并尝试获取聚合:

     withTimestampsAndWatermarks
            .keyBy(0)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(4000)))
            .reduce((x,y) -> new Tuple3<String, Integer, Long>(x.f0, x.f1 + y.f1,y.f2))
    

    很明显,我试图聚合每个字段内的数字。(多一点上下文,我试图聚合的字段(f2)都是1)

    因此,我有以下问题:

    1. 也就是说,窗口只有4秒宽,每五个条目就有一个旧的时间戳,所以我希望下一个窗口的计数会更少。我的理解错了吗
    2. 如果我的理解是正确的——我在并行运行两个程序时没有看到任何聚合,那么我的代码是否有问题
    3. 另一个困扰我的问题是,windows的开始时间和结束时间真正依赖于哪些字段或哪些参数?是在从事件提取的时间戳上,还是在处理时间上
  • 共有1个答案

    裴英才
    2023-03-14

    您必须配置允许的延迟时间:https://nightlies.apache.org/flink/flink-docs-release-1.2/dev/windows.html#allowed-迟到。如果未配置,Flink将删除延迟消息。因此,对于下一个窗口,元素将比上一个窗口少。

    窗口由以下计算指定:

    return timestamp - (timestamp - offset + windowSize) % windowSize
    

    在您的情况下,偏移量为0(默认值)。对于事件时间窗口,时间戳是事件时间。对于处理时间窗口,时间戳是Flink操作符的处理时间。E、 g.如果windowSize=3,timestamp=122,则该元素将被分配给该窗口[120123]。

     类似资料:
    • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

    • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

    • 我有一个带有数据和时间戳的记录日志,我的Flink应用程序按时间戳升序接收记录。在某个键的第一个项到达窗口后,我想在X事件时间后关闭窗口,检查是否有足够的项到达某个条件,并为该键发出通过或失败消息。 对于Flink中的基本窗口功能,这是不可能的吗?例如,如果我希望我的窗口有30秒长,但是键的第一个项在15秒到达,最后一个项在40秒到达,似乎窗口将在30秒关闭,并且该键的记录轨迹将是分成两个窗口。在

    • 我有数据流就像 事件名,事件id,Start_time(时间戳)... 在这里,我想对最后一个带有时间戳的字段<;code>;Start_。 因此,我在flink window中看到的是,所以我猜它需要过去30分钟的事件,但不考虑 我想把数据放在start_ time在最后30分钟内的位置,然后我如何编写转换?我是否需要使用该列使用? 我是Flink的新手。 谢啦

    • 我在操作后加入两个流以创建一个新流。代码如下: oldTableAJoinTableBFunc的代码如下 上面的示例是,在事件时间上,将OldTableAdatStream连接到TableDataStream。 我发现了一个有趣的现象。flink自动创建join1中事件的时间戳。 当我创建oldTableADataStream和tableBDataStream的测试数据时,我故意设置了所有的100

    • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有