当前位置: 首页 > 知识库问答 >
问题:

Apache Flink[在scala中]计算流数据的分位数

单于扬
2023-03-14

我想用Scala计算Flink中流数据的分位数。我的问题类似于但比这一个更简单,flink计算中位数。我认为这可以通过定义一个自定义聚合函数来实现,但我正在寻找一些Scala示例。我已经看了本章中的例子https://github.com/dataArtisans/flink-training-exercises但是没有完全找到我要找的东西。我计算了总和,平均值,我想计算第95个百分位数。

val nwStream = env
  // TestData topic is our Kafka topic
  .addSource(kafkaConsumer)
  // configure timestamp and watermark assigner
  .assignTimestampsAndWatermarks(new TestDataTSAssigner)
  // group by stats by
  .keyBy(_.sSomeId)
  // sliding window is 5 minutes long and slides every 1 minute
  .timeWindow(Time.minutes(5), Time.minutes(1))
  .apply { (key: String, window: TimeWindow, events: Iterable[TestData],
            out: Collector[(String, Long, Long, Double, Double)]) =>
  out.collect((key, window.getEnd, events.size,
    events.map(_.stat1).sum/events.size,
    events.map(_.stat2).sum/events.size)
}

我希望能够在collect函数中以类似的方式计算第95个百分位数。难道没有办法用平面图来做这件事吗?如果我们能说

events.map(_.stat1).quantile(0.95)

但我知道现在没有内置的分位数函数。

任何帮助都将不胜感激。

共有1个答案

钱瑞
2023-03-14

跨整个流的完全精确的分位数/百分位数计算需要保持整个流处于状态,这一点根本不可伸缩。我建议使用类似于t-digest草图的东西来进行估计。

我不知道有谁对Flink这样做过,但这应该是相当直接的。

 类似资料:
  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

  • 问题内容: 我正在尝试从DataFrame计算列中每个值的百分位数。 有没有更好的方法来编写以下代码? 我希望看到更好的性能。 问题答案: 似乎您想要: 性能:

  • 我有大量的数据( 另外,是否是合适的数据结构?或者另一种数据结构会提供更好的复杂性 注意:我不能使用,因为如果使用,也可能存在重复项。查找中值将增加复杂性,因为我将从开始到中间循环以获取其值。

  • 我想编写一个具有重分区的大型数据帧,所以我想计算源数据帧的重分区数。 数据帧/default_blocksize的大小 所以请告诉我如何在spark scala中计算数据帧的大小 提前谢谢。

  • 我正在尝试在旁边使用值方法。 不幸的是,编译器说不兼容的类型。 如果我将s更改为s,它仍然不喜欢它。