当前位置: 首页 > 知识库问答 >
问题:

窗口操作如何与Flink中的key By一起工作?

黄君博
2023-03-14

我在Kafka有这样的数据:

{
  "account": "iOS", //Possible values: iOS, android, web, windows
  "events": [
    {
      "timestamp": "2017-07-03T20:19:35Z"
    }
  ]
}

时间戳从2017-07-03T20:19:35Z2017-07-03T20:22:30Z(大约3分钟)。我有一个Flink程序,它吸收了上面Kafka主题的数据:

object TestWindow {
  def main(args: Array[String]) = {
    val props = new Properties()
    props.put(...)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.addSource(new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), props)).
      keyBy(jsonStr => {
        val jsonParser = new JsonParser()
        val jsonObject = jsonParser.parse(jsonStr).getAsJsonObject()
        jsonObject.get("account")
    }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .reduce({(v1, v2) =>
        println(v1 + " " + v2)
        ""
      })
    env.execute()
  }
}

我很难理解窗口化是如何与keyBy操作一起工作的。

我知道上面的keyBy操作将创建包含相同键元素的不同分区。但我不知道什么时候创建窗口,以及如何将每个分区添加到哪个窗口?

我的猜测是,当每个JSON对象到达时,它将由keyBy操作键入,然后根据该JSON对象的事件时间戳创建一个10秒的窗口?

因此,例如,如果一个时间戳为“2017-07-03T20:19:35Z”的JSON对象到达并拥有一个帐户类型的iOS,那么将为iOS创建一个键控分区,并为“2017-07-03T20:19:35Z”“2017-07-03T20:19:45Z”创建一个窗口?还是每10秒创建一个窗口,而不考虑来自Kafka的JSON对象的事件时间戳?


共有1个答案

卢书
2023-03-14

您关于keyBy的假设是正确的keyBy根据定义的键属性对流进行分区,并根据每个键计算窗口。

在示例中使用的TumblingEventTimeWindow具有固定的窗口边框,即边框不依赖于数据的时间戳。10秒的滚动窗口将从[00:00:00.000, 00:00:10.000)[00:00:10.000, 00:00:20.000)等创建窗口。到达窗口操作符的记录将被分配给与其时间戳相交的窗口。当窗口关闭时(操作员的本地时间通过窗口的结束时间戳),窗口的结果被计算并发出。请注意,窗口仅使用第一条记录实例化,即空窗口不会触发计算,也不能发出数据,例如零计数。

其他窗口类型,如会话窗口,具有数据驱动的边界。在会话窗口的情况下,所有彼此间隔不超过一段时间的记录都被分组到一个窗口中。

 类似资料:
  • 我正在学习flink,试图理解一些概念。以下是几个问题: 对流的操作与从像这样的儿童获取源代码有什么区别?这两个操作都分割流。 还尝试实现一个非常简单的keyBy操作符来理解它,如下所示: 但我得到的输出令人困惑: 这意味着在子任务3上执行的所有内容。有人能帮忙解释一下原因吗?

  • 我的flink作业有keyBy操作符,它将date~clientID(date为yyyymmddhmm,MM为5分钟后更改的分钟)作为键。这个操作员之后是5分钟的翻滚窗口。我们的Kafka输入平均为每分钟300万个事件,在峰值时间大约为每分钟2000万个事件。检查点持续时间和两次检查点之间的最小暂停时间为3分钟。 现在我的疑虑是: 1)keyBy创建的状态是永久保持还是在5分钟后被驱逐。 5)我每

  • Flink中的会话窗口在prod env上没有按预期工作(相同的逻辑在本地env上工作)。这个想法是为特定的用户ID发出“sample_event_two”的计数 尽管集合中存在sample_event_one(通过验证日志消息“已接收sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“未找到 sampleOneEvent 事件,而不

  • 我很难理解窗口在Kafka Streams中是如何工作的。到目前为止,结果似乎与我所阅读和理解的不一致。 我已经创建了一个带有支持主题的KSQL流。KSQL SELECT语句中的“列”之一已被指定为该主题的TIMESTAMP。 my-stream主题中的记录按键(PARTITION_KEY)分组,并用跳转窗口窗口 记录通过 然后我通过 组中的第一个窗口转换为7:00-7:05 当我通过控制台消费者

  • Storm core 支持处理落在窗口内的一组元组。窗口操作指定了一下两个参数 1.窗口的长度 - 窗口的长度或持续时间 2.滑动间隔 - 窗口滑动的时间间隔 滑动窗口 元组被分组在窗口和每个滑动间隔窗口中。 一个元组可以属于多个窗口。 例如一个持续时间长度为 10 秒和滑动间隔 5 秒的滑动窗口。 ........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |..

  • 所以我正在用pygame做一个游戏,我也想用tkinter。我在tkinter窗口中嵌入了一个pygame窗口,但我似乎什么都做不了。 对于上下文,以下是完整的代码: 当我使用时,什么都不会发生。在类中使用pyplay是有效的,但是在我更复杂的游戏中,为所有变量使用self.variable似乎是不必要的。 如何在window类之外的pygame窗口中运行代码?