source
.keyBy(...)
.timeWindow(...)
.fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
.keyBy(_ => ())
.maxBy(1)
fold
的结果是一个(key,count)
元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。
然后我通过一个常量(keyby(_=>())
-因为这是一个全局操作)进行键控,并使用maxby
-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。
我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。
目前在Flink有可能吗?
Flink默认情况下没有这样的过滤器,但自己实现一个应该相当容易。
您可以使用类似于以下的有状态flatmap
来完成此操作:
val source: DataStream[Int] = ???
source
.keyBy(_: Int => _)
.timeWindow(Time.minutes(10))
.fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
// move everything to the same key
.keyBy(_ => 0)
// use stateful flatmap to remember highest count and filter by that
.flatMapWithState( (in, state: Option[Int]) =>
// filter condition
if (in._2 > state.getOrElse(-1))
// emit new value and update max count
(Seq(in), Some(in._2))
else
// emit nothing (empty Seq()) and keep count
(Seq(), state)
).setParallelism(1)
如果非并行(单线程)筛选器运算符成为瓶颈,您可以通过添加具有随机键的KeyBy
和具有更高并行度的有状态筛选器FlatMap
来添加并行预筛选器。
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
我有一个流是消费的Flink Kafka消费者将加入另一个流为定义的窗口大小,如Time.milliseconds(10000)。 如何在运行时将窗口大小更改为Time.milliseconds(20000)?
假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我
当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?
输入: 结果: