当前位置: 首页 > 知识库问答 >
问题:

火花窗口自定义函数-获取分区记录总数

司徒高丽
2023-03-14

我有一个timeseries数据集,它由id分区,并由时间戳排序。示例:

  ID     Timestamp   Feature

 "XSC"   1986-05-21  44.7530
 "XSC"   1986-05-22  44.7530
 "XSC"   1986-05-23  23.5678

 "TM"    1982-03-08  22.2734
 "TM"    1982-03-09  22.1941
 "TM"    1982-03-10  22.0847
 "TM"    1982-03-11  22.1741
 "TM"    1982-03-12  22.1840
 "TM"    1982-03-15  22.1344

我有一些自定义逻辑,我需要计算,它应该在每个窗口,每个分区内完成。我知道Spark对窗口函数有丰富的支持,我正试图将其用于此目的。

val window = Window.partitionBy("id").orderBy("timestamp") 
frame = frame.withColumn("my_cnt", count(column).over(window))

我需要做一些类似的事情:

var i = 1
var y = col("Feature")
var result = y
while (i < /* total number of records within each partition goes here */) {
    result = result + lit(1) * lag(y, i).over(window) + /* complex computation */
    i = i + 1
}
dataFrame.withColumn("Computed_Value", result)

如何将每个分区中记录总数作为标量值获取?我还添加了计数“my_cnt”值,它添加了分区的总价值,但在我的情况下似乎无法使用它。

共有1个答案

东方镜
2023-03-14

Spark的collect_list函数允许您将窗口值聚合为列表。这个列表可以传递给UDF来执行一些复杂的计算

所以如果你有消息来源

val data = List(
  ("XSC", "1986-05-21", 44.7530),
  ("XSC", "1986-05-22", 44.7530),
  ("XSC", "1986-05-23", 23.5678),
  ("TM", "1982-03-08", 22.2734),
  ("TM", "1982-03-09", 22.1941),
  ("TM", "1982-03-10", 22.0847),
  ("TM", "1982-03-11", 22.1741),
  ("TM", "1982-03-12", 22.1840),
  ("TM", "1982-03-15", 22.1344),
).toDF("id", "timestamp", "feature")
  .withColumn("timestamp", to_date('timestamp))

和一些复杂的函数,包装在记录上的UDF中(例如表示为元组)

 val complexComputationUDF = udf((list: Seq[Row]) => {
  list
    .map(row => (row.getString(0), row.getDate(1).getTime, row.getDouble(2)))
    .sortBy(-_._2)
    .foldLeft(0.0) {
      case (acc, (id, timestamp, feature)) => acc + feature
    }
})
val windowAll = Window.partitionBy("id")
val windowRunning = Window.partitionBy("id").orderBy("timestamp")
val newData = data
  // I assuming thatyou need id,timestamp & feature for the complex computattion. So I create a struct
  .withColumn("record", struct('id, 'timestamp, 'feature))
  // Collect all records in the partition as a list of tuples and pass them to the complexComupation
  .withColumn("computedValueAll",
     complexComupationUDF(collect_list('record).over(windowAll)))
  // Collect records in a time ordered windows in the partition as a list of tuples and pass them to the complexComupation
  .withColumn("computedValueRunning",
     complexComupationUDF(collect_list('record).over(windowRunning)))

这将导致类似:

+---+----------+-------+--------------------------+------------------+--------------------+
|id |timestamp |feature|record                    |computedValueAll  |computedValueRunning|
+---+----------+-------+--------------------------+------------------+--------------------+
|XSC|1986-05-21|44.753 |[XSC, 1986-05-21, 44.753] |113.07379999999999|44.753              |
|XSC|1986-05-22|44.753 |[XSC, 1986-05-22, 44.753] |113.07379999999999|89.506              |
|XSC|1986-05-23|23.5678|[XSC, 1986-05-23, 23.5678]|113.07379999999999|113.07379999999999  |
|TM |1982-03-08|22.2734|[TM, 1982-03-08, 22.2734] |133.0447          |22.2734             |
|TM |1982-03-09|22.1941|[TM, 1982-03-09, 22.1941] |133.0447          |44.4675             |
|TM |1982-03-10|22.0847|[TM, 1982-03-10, 22.0847] |133.0447          |66.5522             |
|TM |1982-03-11|22.1741|[TM, 1982-03-11, 22.1741] |133.0447          |88.7263             |
|TM |1982-03-12|22.184 |[TM, 1982-03-12, 22.184]  |133.0447          |110.91029999999999  |
|TM |1982-03-15|22.1344|[TM, 1982-03-15, 22.1344] |133.0447          |133.0447            |
+---+----------+-------+--------------------------+------------------+--------------------+
 类似资料:
  • 我想用Apache Spark读入具有以下结构的文件。 csv太大了,不能使用熊猫,因为读取这个文件需要很长时间。有什么方法类似于 多谢!

  • The BrowserWindow module is the foundation of your Electron application, and it exposes many APIs that can change the look and behavior of your browser windows. In this tutorial, we will be going over

  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 这个问题与这个主题有关: Spark 2.2 Scala 数据帧从字符串数组中选择,捕获错误 我需要区分缺少列的记录(这在我的用例中不是错误)和具有不适用于列类型的垃圾值的记录。 在执行selectExpr之后,这两种情况在结果数据帧中都显示为null。我正在寻找一种快速的方法,将缺少列的记录包含在好的结果中,同时将具有垃圾值的记录放入坏桶中。不好的可能包括像一个值为空字符串的int字段,或者“a

  • 我需要一个窗口函数,该函数按一些键(=列名)进行分区,按另一个列名排序,并返回排名前x的行。 这适用于升序: 但当我试图在第4行中将其更改为或时,我得到了一个语法错误。这里的正确语法是什么?