当前位置: 首页 > 知识库问答 >
问题:

Flink中最大时间窗口的更新流

怀晋
2023-03-14
source
  .keyBy(...)
  .timeWindow(...)
  .fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
  .keyBy(_ => ())
  .maxBy(1)

fold的结果是一个(key,count)元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。

然后我通过一个常量(keyby(_=>())-因为这是一个全局操作)进行键控,并使用maxby-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。

我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。

目前在Flink有可能吗?

共有1个答案

骆磊
2023-03-14

Flink默认情况下没有这样的过滤器,但自己实现一个应该相当容易。

您可以使用类似于以下的有状态flatmap来完成此操作:

val source: DataStream[Int] = ???

source
  .keyBy(_: Int => _)
  .timeWindow(Time.minutes(10))
  .fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
  // move everything to the same key
  .keyBy(_ => 0) 
  // use stateful flatmap to remember highest count and filter by that
  .flatMapWithState( (in, state: Option[Int]) => 
    // filter condition
    if (in._2 > state.getOrElse(-1)) 
      // emit new value and update max count
      (Seq(in), Some(in._2)) 
    else 
      // emit nothing (empty Seq()) and keep count
      (Seq(), state)
  ).setParallelism(1)

如果非并行(单线程)筛选器运算符成为瓶颈,您可以通过添加具有随机键的KeyBy和具有更高并行度的有状态筛选器FlatMap来添加并行预筛选器。

 类似资料:
  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢

  • 我有一个流是消费的Flink Kafka消费者将加入另一个流为定义的窗口大小,如Time.milliseconds(10000)。 如何在运行时将窗口大小更改为Time.milliseconds(20000)?

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?