我在Kafka有这样的数据:
{
"account": "iOS", //Possible values: iOS, android, web, windows
"events": [
{
"timestamp": "2017-07-03T20:19:35Z"
}
]
}
时间戳从2017-07-03T20:19:35Z
到2017-07-03T20:22:30Z
(大约3分钟)。我有一个Flink程序,它吸收了上面Kafka主题的数据:
object TestWindow {
def main(args: Array[String]) = {
val props = new Properties()
props.put(...)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.addSource(new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), props)).
keyBy(jsonStr => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonStr).getAsJsonObject()
jsonObject.get("account")
}).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce({(v1, v2) =>
println(v1 + " " + v2)
""
})
env.execute()
}
}
我很难理解窗口化是如何与keyBy
操作一起工作的。
我知道上面的keyBy
操作将创建包含相同键元素的不同分区。但我不知道什么时候创建窗口,以及如何将每个分区添加到哪个窗口?
我的猜测是,当每个JSON对象到达时,它将由keyBy
操作键入,然后根据该JSON对象的事件时间戳
创建一个10秒的窗口?
因此,例如,如果一个时间戳为“2017-07-03T20:19:35Z”的JSON对象到达并拥有一个帐户类型的iOS,那么将为iOS创建一个键控分区,并为
“2017-07-03T20:19:35Z”
到“2017-07-03T20:19:45Z”
创建一个窗口?还是每10秒创建一个窗口,而不考虑来自Kafka的JSON对象的事件时间戳?
您关于keyBy
的假设是正确的keyBy
根据定义的键属性对流进行分区,并根据每个键计算窗口。
在示例中使用的TumblingEventTimeWindow
具有固定的窗口边框,即边框不依赖于数据的时间戳。10秒的滚动窗口将从[00:00:00.000, 00:00:10.000)
,[00:00:10.000, 00:00:20.000)
等创建窗口。到达窗口操作符的记录将被分配给与其时间戳相交的窗口。当窗口关闭时(操作员的本地时间通过窗口的结束时间戳),窗口的结果被计算并发出。请注意,窗口仅使用第一条记录实例化,即空窗口不会触发计算,也不能发出数据,例如零计数。
其他窗口类型,如会话窗口,具有数据驱动的边界。在会话窗口的情况下,所有彼此间隔不超过一段时间的记录都被分组到一个窗口中。
我正在学习flink,试图理解一些概念。以下是几个问题: 对流的操作与从像这样的儿童获取源代码有什么区别?这两个操作都分割流。 还尝试实现一个非常简单的keyBy操作符来理解它,如下所示: 但我得到的输出令人困惑: 这意味着在子任务3上执行的所有内容。有人能帮忙解释一下原因吗?
我的flink作业有keyBy操作符,它将date~clientID(date为yyyymmddhmm,MM为5分钟后更改的分钟)作为键。这个操作员之后是5分钟的翻滚窗口。我们的Kafka输入平均为每分钟300万个事件,在峰值时间大约为每分钟2000万个事件。检查点持续时间和两次检查点之间的最小暂停时间为3分钟。 现在我的疑虑是: 1)keyBy创建的状态是永久保持还是在5分钟后被驱逐。 5)我每
Flink中的会话窗口在prod env上没有按预期工作(相同的逻辑在本地env上工作)。这个想法是为特定的用户ID发出“sample_event_two”的计数 尽管集合中存在sample_event_one(通过验证日志消息“已接收sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“未找到 sampleOneEvent 事件,而不
我很难理解窗口在Kafka Streams中是如何工作的。到目前为止,结果似乎与我所阅读和理解的不一致。 我已经创建了一个带有支持主题的KSQL流。KSQL SELECT语句中的“列”之一已被指定为该主题的TIMESTAMP。 my-stream主题中的记录按键(PARTITION_KEY)分组,并用跳转窗口窗口 记录通过 然后我通过 组中的第一个窗口转换为7:00-7:05 当我通过控制台消费者
Storm core 支持处理落在窗口内的一组元组。窗口操作指定了一下两个参数 1.窗口的长度 - 窗口的长度或持续时间 2.滑动间隔 - 窗口滑动的时间间隔 滑动窗口 元组被分组在窗口和每个滑动间隔窗口中。 一个元组可以属于多个窗口。 例如一个持续时间长度为 10 秒和滑动间隔 5 秒的滑动窗口。 ........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |..
所以我正在用pygame做一个游戏,我也想用tkinter。我在tkinter窗口中嵌入了一个pygame窗口,但我似乎什么都做不了。 对于上下文,以下是完整的代码: 当我使用时,什么都不会发生。在类中使用pyplay是有效的,但是在我更复杂的游戏中,为所有变量使用self.variable似乎是不必要的。 如何在window类之外的pygame窗口中运行代码?