当前位置: 首页 > 知识库问答 >
问题:

ApacheFlink:窗口函数和时间的开始

翟英达
2023-03-14

WindowAsSigner中,元素被分配给一个或多个TimeWindow实例。在滑动事件时间窗口的情况下,这发生在SlidingEventTimeWindows#任务Windows1中。

如果窗口的size=5幻灯片=1,则将时间戳为0的元素分配到以下窗口:

  1. 窗口(开始=0,结束=5)
  2. 窗口(开始=-1,结束=4)
  3. 窗口(开始=-2,结束=3)
  4. 窗口(开始=-3,结束=2)
  5. 窗口(开始=-4,结束=1)

在一幅图片中:

                            +-> Beginning of time
                            |
                            |
+----------------------------------------------+
|     size = 5              +--+ element       |
|    slide = 1              |                  |
|                           v                  |
| t=[ 0,5[ Window 1         XXXXX              |
| t=[-1,4[ Window 2        XXXXX               |
| t=[-2,3[ Window 3       XXXXX                |
| t=[-3,2[ Window 4      XXXXX                 |
| t=[-4,1[ Window 5     XXXXX                  |
|                                              |
| time(-4 to +4)        ----                   |
|                       432101234              |
+---------------------------+------------------+
                            |
                            |
                            |
                            +

有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Flink第一个元素应该只有一个窗口(t=[4,8[Window 1)。像这样:

                            +-> Beginning of time
                            |
                            |
+-----------------------------------------------+
|     size = 5              +--+ element        |
|    slide = 1              |                   |
|                           v                   |
| t=[ 0,5[ Window 1         XXXXX               |
| t=[ 1,6[ Window 2          XXXXX              |
| t=[ 2,7[ Window 3           XXXXX             |
| t=[ 3,8[ Window 4            XXXXX            |
| t=[ 4,9[ Window 5             XXXXX           |
|                                               |
| time(-4 to +8)        ----                    |
|                       4321012345678           |
+---------------------------+-------------------+
                            |
                            |
                            |
                            +

一旦窗口数量达到并超过窗口大小,这将不再有效。然后,在上述情况下,所有元素都位于5个窗口内。

脚注:

  1. org。阿帕奇。Flink。流动。应用程序编程接口。开窗。转让人。滑动事件时间窗口#分配窗口

共有2个答案

贺正祥
2023-03-14

我可能会为这个问题找到更好的解决方案。这个想法是把水印设置到将来足够远的地方,这样你的窗口就有足够的数据。早期的窗口仍然存在,但它们将被丢弃。

以下是具有周期性水印[T]的赋值器的概念证明:

  class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
    var t: Option[Long] = None
    var firstTime = true

    override def extractTimestamp(el: T, prevTs: Long): Long = {
      t = Some(prevTs)
      prevTs
    }

    override def getCurrentWatermark(): Watermark = (t, firstTime) match {
      case (None, _) => return null
      case (Some(v), false) => new Watermark(v)
      case (Some(v), true) => {
        firstTime = false
        new Watermark(v + wait)
      }
    }
  }

更新:不幸的是,它不起作用(现在我不知道为什么应该),总是有几个键在键流与早期窗口。所以最后我只是过滤错误的窗口,比如:

val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>      
  state match {
    case None    => (Seq(), Some(1))
    case Some(s) => (Seq(in), Some(s))
    case Some(v) => (Seq(), Some(v+1))
  })
牛兴安
2023-03-14

目前没有办法指定Flink作业的有效时间间隔。考虑到您可能也想将您的工作应用于历史数据,这也可能有点问题。

但是,您可以手动筛选在超时开始之前启动的窗口:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val startTime = 1
val windowLength = 2
val slide = 1

val input = env.fromElements((1,1), (2,2), (3,3))
               .assignAscendingTimestamps(x => x._2)

val windowed = input
      .timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
      .apply{ (window, iterable, collector: Collector[Int]) =>
         if (window.getStart >= startTime) {
           collector.collect(iterable.map(_._1).reduce(_ + _))
         } else {
           // discard early windows
         }
       }

windowed.print()

env.execute()
 类似资料:
  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想

  • 我试图创建一个程序,它有一个tkinter窗口打开,然后当你按下一个按钮,它关闭tkinter窗口,打开一个pyplay窗口。然而,当我点击按钮打开pyplay窗口时,它会打开pyplay窗口,而tkinter窗口保持打开状态。 代码: 我还尝试使用: 我怎样才能解决这个问题?(我正在MacOS 11.1上运行Python 3.7.7)

  • 本文向大家介绍Oracle开发之窗口函数,包括了Oracle开发之窗口函数的使用技巧和注意事项,需要的朋友参考一下 一、窗口函数简介: 到目前为止,我们所学习的分析函数在计算/统计一段时间内的数据时特别有用,但是假如计算/统计需要随着遍历记录集的每一条记录而进行呢?举些例子来说: ①列出每月的订单总额以及全年的订单总额 ②列出每月的订单总额以及截至到当前月的订单总额 ③列出上个月、当月、下一月的订

  • 我正在使用Pygame模块制作一个简单的游戏。我需要Tkinter窗口与Pygame窗口一起打开。 每当我试图打开两个窗口时,第二个窗口只有在我杀死第一个窗口后才会打开。 现在,我能想到的唯一解决方案是使用多线程。但是,我无法实现它。 我该怎么做呢?我真的很感激这里的帮助。谢谢你!