假设我有一个股票市场交易事件流,如下所示:
technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . .
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018
使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如:
12.52, 19.25, 09.11, 17.54, 120532, GOOG, 1/1/2017
14.37, 29.52, 01.53, 12.96, 627156, MSFT, 1/1/2017
(请注意,这些交易价格/交易量完全是虚构的。
假设我想创建一个大小为2、时间间隔为1天的窗口,这样我们的数据看起来就像这样:
[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .
这很好,但这是有问题的,因为股票市场交易日期不是连续的......换句话说,如果我正确地理解了 Flink 的机制(我可能是错的),那么使用事件时间滑动窗口的问题如下:
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.<windowed transformation>(<window function>);
在这样的数据中,日期值不是连续的(这意味着它们遵循一个离散的系列,其中包含一个或多个缺失日的不连续性),因为没有股票市场关闭日期的股票市场数据,例如在假期或周末。因此,考虑到这一点,我们的流实际上最终会看起来更像这样(因为交易在2017年12月30日、2017年12月31日和2018年1月1日关闭):
[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
如何让我的 Flink 流忽略缺少的日期(而不是窗口或联接或映射连续的非丢失日期),以便我的流看起来像这样:
[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
?
(注意:请忽略我用字符串“technical”(如technical1、technical2等)递增数字的方式,因为正如我已经提到的,该值仅用于本文中的描述目的,实际上不存在于数据中。确定两个交易条目是否连续的唯一方法是按股票代码对它们进行分组,并按交易日期排序。让我们假设不存在重复事件。)
如果我理解正确,你的问题是,因为有一些确定的时期,你没有收到事件,那么窗口就不会正常运行,因为它们不知道时间的流逝。
一种选择是像这样发出一个水印:
streamEnvironment.addSource(new SourceFunction<Object>() {
@Override
public void run(final SourceContext<Object> ctx) {
(...)
ctx.emitWatermark(new Watermark(timestamp));
}
@Override
public void cancel() {
}
})
请记住,如果您在水印之前接收到事件,它们将被忽略,因此水印发射的周期性是“窗口精度”(尽快发射)和对后期事件的容忍度之间的折衷。
我正在尝试flink的一些网络监控工作。我的目标是计算每个的不同。 我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。 例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗
输入: 结果:
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1
当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?
在流处理问题中,我们有3个传感器,每个传感器每8毫秒生成一个时间戳样本(传感器的时间是同步的)。所以我想合并每个时间戳的数据(对于3个传感器,我们应该为每个时间戳输出3个合并的样本数据)。此外,我们有一个160毫秒的时间限制,这样每个数据在生成时间戳后最多应该在160毫秒后输出。所以我决定使用Flink事件时间概念和时间窗口。因为时间戳在每个传感器的样本中都是唯一的,所以我们认为它是数据流的关键。