我的Flink工作必须在每次工作轮班后计算某个集合。换挡是可配置的,看起来类似于:
1st shift: 00:00am - 06:00am
2nd shift: 06:00am - 12:00pm
3rd shift: 12:00pm - 18:00pm
出于操作目的,每天的班次都是一样的,一周/一年中的几天之间没有区别。轮班配置可以随时间变化,并且可以不单调,因此表中留下了一个简单的EventTime窗口,如:TumblingEventTimeWindows.of(time.of(6,HOURS))
,因为一些轮班可能会缩小或超时,或者在中间插入几个小时...
我想出了一些基于GlobalWindow和自定义触发器的东西:
LinkedList<Shift> shifts;
datastream.windowAll(GlobalWindows.create())
.trigger(ShiftTrigger.create(shifts))
.aggregate(myAggregateFunction)
在我的自定义触发器中,我尝试识别传入事件是否超过了正在进行的工作轮班的结束时间,并触发该轮班的窗口:
@Override
public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
// compute the end time of the on-going shift
final Instant currentShiftEnd = ...
// fire window for the shift if the event passes the end line
if (ShiftPredicate.of(currentShiftEnd).test(element)) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
省略了状态管理和一些记忆优化的代码,这似乎在流式用例中工作得很好:shift endtime之后进入的第一个事件触发了最后一个shift的触发和聚合。
我读到这样做的原因是:
Flink只保证基于时间的窗口的删除,而不保证其他类型的删除,例如全局窗口(请参阅窗口分配器)
我已经在org.apache.flink.streaming.api.windowing
包中查看了一些类似TumblingEventTimeWindows
或DynamicEventTimeSessionWindows
的东西,可以使用这些东西,也可以使用一天中的一个结束小时进行扩展,这样,当元素的水印超过窗口限制时,我就可以依赖这些触发的默认事件时触发器,但我不确定如何做到这一点。凭直觉我会希望这样的事情:
shifts.forEach(shift -> {
datastream.windowAll(EventTimeWindow.fromTo(DAILY, shift.startTime, shift.endTime))
.aggregate(myAggregateFunction);
});
我知道对于任意复杂度的用例,有些人所做的是抛弃Windows API,以损害低级进程函数,在低级进程函数中,他们通过将元素保持为运算符的受管状态来“手动”计算窗口,而在给定的规则或条件下,他们拟合并从定义的聚合函数或累加器中提取结果。在process函数中,也可以通过点击onclose
钩子来pin point任何挂起的计算。
是否有一种方法可以通过扩展windows API中的任何对象来获得一天中某几个小时的循环事件时间窗口的概念?
如果我没有理解错的话,这里有两个独立的问题/问题需要解决:
对于(1),您使用GlobalWindows
和自定义ShiftTrigger
的方法是一种可行的方法。如果您想探索使用process函数的替代方法,我编写了一个示例,您可以在Flink文档中找到。
另一种处理方法是避免在这些作业完成时取消它们,而是使用./bin/flink stop--drain[-p savepointPath]
干净地停止作业(使用保存点),同时排出所有剩余的窗口结果(通过注入最后一个大水印(MAX_WATERMARK))。
我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想
在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl
问题内容: 我有一个sql代码,可以获取每个员工的总工作时间及其超时时间。我想计算他当天的总加班时间。你能帮我吗?8小时是每天的常规时间。 这是代码 样品输出 我想要的是这样的 问题答案: 你可以做这样的事情 这是 SQLFiddle 演示 您需要为提供真正的默认值,并针对当他们。在一个极端的情况下,如果s是由员工有一天到另一天回家而造成的,那么这些默认值可能分别是和,因为您要计算每个日历日的加班
为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。