在流处理问题中,我们有3个传感器,每个传感器每8毫秒生成一个时间戳样本(传感器的时间是同步的)。所以我想合并每个时间戳的数据(对于3个传感器,我们应该为每个时间戳输出3个合并的样本数据)。此外,我们有一个160毫秒的时间限制,这样每个数据在生成时间戳后最多应该在160毫秒后输出。所以我决定使用Flink事件时间概念和时间窗口。因为时间戳在每个传感器的样本中都是唯一的,所以我们认为它是数据流的关键。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(160)) {
@Override
public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
return element.f1 ;
}
}).keyBy(1).timeWindow(Time.milliseconds(160))
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do merging samples);
在代码中,我们首先引入流的第二个字段作为事件时间,并设置周期性水印(因为数据是以固定的速率生成的,具有固定的延迟)。之后,我们将事件时间设置为流的关键。我们也想收集后期数据,所以我们使用了side OutputLateData
。最后,我们用相同的键减少(合并)数据。问题是在Flink事件时间模式下,定义的窗口不输出任何数据!如果不设置事件时间,它将输出数据,但我想使用事件时间作为Flin窗口的时间。我尝试了多次窗口和水印,但他们没有输出任何东西。
我使用Flink count窗口和自定义超时触发器成功地解决了相同的问题。
更新:传入的数据流格式为以下类型(如果我们有3个传感器):
sensor_id, timestamp, data
(1, 1531980773390, {})
(2, 1531980773390, {})
(3, 1531980773390, {})
(1, 1531980773398, {})
(2, 1531980773398, {})
(3, 1531980773398, {})
以此类推,每8毫秒一次。
在数据流中保存延迟数据
final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data") {
};
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);
我认为你的问题是你按时间戳键。
您应该有keyBy(0),以便按传感器id分割流。
难道一切都晚了?
默认情况下,周期性水印生成器每200msec调用一次(实时测量,而不是事件时间)。在你的情况下,这可能相当长。使用setAutoWatermark Interval更改此值。您可能还需要考虑使用setBufferTimeout来减少延迟。
假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?
输入: 结果:
我有一个带有数据和时间戳的记录日志,我的Flink应用程序按时间戳升序接收记录。在某个键的第一个项到达窗口后,我想在X事件时间后关闭窗口,检查是否有足够的项到达某个条件,并为该键发出通过或失败消息。 对于Flink中的基本窗口功能,这是不可能的吗?例如,如果我希望我的窗口有30秒长,但是键的第一个项在15秒到达,最后一个项在40秒到达,似乎窗口将在30秒关闭,并且该键的记录轨迹将是分成两个窗口。在
我的Flink工作必须在每次工作轮班后计算某个集合。换挡是可配置的,看起来类似于: 出于操作目的,每天的班次都是一样的,一周/一年中的几天之间没有区别。轮班配置可以随时间变化,并且可以不单调,因此表中留下了一个简单的EventTime窗口,如:,因为一些轮班可能会缩小或超时,或者在中间插入几个小时... 我想出了一些基于GlobalWindow和自定义触发器的东西: 在我的自定义触发器中,我尝试识