在WindowAsSigner
中,元素被分配给一个或多个TimeWindow
实例。在滑动事件时间窗口的情况下,这发生在SlidingEventTimeWindows#任务Windows
1中。
如果窗口的size=5
和幻灯片=1
,则将时间戳为0的元素分配到以下窗口:
在一幅图片中:
+-> Beginning of time
|
|
+----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[-1,4[ Window 2 XXXXX |
| t=[-2,3[ Window 3 XXXXX |
| t=[-3,2[ Window 4 XXXXX |
| t=[-4,1[ Window 5 XXXXX |
| |
| time(-4 to +4) ---- |
| 432101234 |
+---------------------------+------------------+
|
|
|
+
有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Flink第一个元素应该只有一个窗口(t=[4,8[Window 1
)。像这样:
+-> Beginning of time
|
|
+-----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[ 1,6[ Window 2 XXXXX |
| t=[ 2,7[ Window 3 XXXXX |
| t=[ 3,8[ Window 4 XXXXX |
| t=[ 4,9[ Window 5 XXXXX |
| |
| time(-4 to +8) ---- |
| 4321012345678 |
+---------------------------+-------------------+
|
|
|
+
一旦窗口数量达到并超过窗口大小,这将不再有效。然后,在上述情况下,所有元素都位于5个窗口内。
脚注:
org。阿帕奇。Flink。流动。应用程序编程接口。开窗。转让人。滑动事件时间窗口#分配窗口
我可能会为这个问题找到更好的解决方案。这个想法是把水印设置到将来足够远的地方,这样你的窗口就有足够的数据。早期的窗口仍然存在,但它们将被丢弃。
以下是具有周期性水印[T]的赋值器的概念证明:
class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
var t: Option[Long] = None
var firstTime = true
override def extractTimestamp(el: T, prevTs: Long): Long = {
t = Some(prevTs)
prevTs
}
override def getCurrentWatermark(): Watermark = (t, firstTime) match {
case (None, _) => return null
case (Some(v), false) => new Watermark(v)
case (Some(v), true) => {
firstTime = false
new Watermark(v + wait)
}
}
}
更新:不幸的是,它不起作用(现在我不知道为什么应该),总是有几个键在键流与早期窗口。所以最后我只是过滤错误的窗口,比如:
val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>
state match {
case None => (Seq(), Some(1))
case Some(s) => (Seq(in), Some(s))
case Some(v) => (Seq(), Some(v+1))
})
目前没有办法指定Flink作业的有效时间间隔。考虑到您可能也想将您的工作应用于历史数据,这也可能有点问题。
但是,您可以手动筛选在超时开始之前启动的窗口:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val startTime = 1
val windowLength = 2
val slide = 1
val input = env.fromElements((1,1), (2,2), (3,3))
.assignAscendingTimestamps(x => x._2)
val windowed = input
.timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
.apply{ (window, iterable, collector: Collector[Int]) =>
if (window.getStart >= startTime) {
collector.collect(iterable.map(_._1).reduce(_ + _))
} else {
// discard early windows
}
}
windowed.print()
env.execute()
我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?
我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想
我试图创建一个程序,它有一个tkinter窗口打开,然后当你按下一个按钮,它关闭tkinter窗口,打开一个pyplay窗口。然而,当我点击按钮打开pyplay窗口时,它会打开pyplay窗口,而tkinter窗口保持打开状态。 代码: 我还尝试使用: 我怎样才能解决这个问题?(我正在MacOS 11.1上运行Python 3.7.7)
本文向大家介绍Oracle开发之窗口函数,包括了Oracle开发之窗口函数的使用技巧和注意事项,需要的朋友参考一下 一、窗口函数简介: 到目前为止,我们所学习的分析函数在计算/统计一段时间内的数据时特别有用,但是假如计算/统计需要随着遍历记录集的每一条记录而进行呢?举些例子来说: ①列出每月的订单总额以及全年的订单总额 ②列出每月的订单总额以及截至到当前月的订单总额 ③列出上个月、当月、下一月的订
我正在使用Pygame模块制作一个简单的游戏。我需要Tkinter窗口与Pygame窗口一起打开。 每当我试图打开两个窗口时,第二个窗口只有在我杀死第一个窗口后才会打开。 现在,我能想到的唯一解决方案是使用多线程。但是,我无法实现它。 我该怎么做呢?我真的很感激这里的帮助。谢谢你!