当前位置: 首页 > 知识库问答 >
问题:

在Spark Scala中的列上运行累积/迭代Costum方法

轩辕翰
2023-03-14

嗨,我是Spark/Scala的新手,我一直在尝试-AKA失败,根据特定的递归公式在火花数据帧中创建一列:

这里是伪代码。

someDf.col2[0] = 0

for i > 0
someDf.col2[i] = x * someDf.col1[i-1] + (1-x) * someDf.col2[i-1]

为了深入了解更多细节,这里是我的出发点:这个数据帧是在日期和个人id级别上聚合的结果。

所有进一步的计算都必须针对特定的id,并且必须考虑到前一周发生的事情。

为了说明这一点,我将这些值简化为0和1,删除了乘法器x1-x,并将col2初始化为零。

var someDf = Seq(("2016-01-10 00:00:00.0","385608",0,0), 
         ("2016-01-17 00:00:00.0","385608",0,0),
         ("2016-01-24 00:00:00.0","385608",1,0),
         ("2016-01-31 00:00:00.0","385608",1,0),
         ("2016-02-07 00:00:00.0","385608",1,0),
         ("2016-02-14 00:00:00.0","385608",1,0),
         ("2016-01-17 00:00:00.0","105010",0,0),
         ("2016-01-24 00:00:00.0","105010",1,0),
         ("2016-01-31 00:00:00.0","105010",0,0),
         ("2016-02-07 00:00:00.0","105010",1,0)
        ).toDF("dates", "id", "col1","col2" )

someDf.show()
+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   0|
|2016-02-07 00:00:...|385608|   1|   0|
|2016-02-14 00:00:...|385608|   1|   0|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   0|
|2016-02-07 00:00:...|105010|   1|   0|
+--------------------+------+----+----+

到目前为止我所尝试的与所期望的

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val date_id_window = Window.partitionBy("id").orderBy(asc("dates")) 

someDf.withColumn("col2", lag($"col1",1 ).over(date_id_window) + 
lag($"col2",1 ).over(date_id_window) ).show() 
+--------------------+------+----+----+ / +--------------------+
|               dates|    id|col1|col2| / | what_col2_should_be|
+--------------------+------+----+----+ / +--------------------+
|2016-01-17 00:00:...|105010|   0|null| / |                   0| 
|2016-01-24 00:00:...|105010|   1|   0| / |                   0|
|2016-01-31 00:00:...|105010|   0|   1| / |                   1|
|2016-02-07 00:00:...|105010|   1|   0| / |                   1|
+-------------------------------------+ / +--------------------+
|2016-01-10 00:00:...|385608|   0|null| / |                   0|
|2016-01-17 00:00:...|385608|   0|   0| / |                   0|
|2016-01-24 00:00:...|385608|   1|   0| / |                   0|
|2016-01-31 00:00:...|385608|   1|   1| / |                   1|
|2016-02-07 00:00:...|385608|   1|   1| / |                   2|
|2016-02-14 00:00:...|385608|   1|   1| / |                   3|
+--------------------+------+----+----+ / +--------------------+

有没有办法做到这一点与火花数据帧,我见过多个累积类型计算,但从来没有包括相同的列,我相信问题是,行i-1的新计算值没有被考虑,而是使用旧的i-1总是0。

任何帮助都将不胜感激。

共有3个答案

锺英卫
2023-03-14

您应该将转换应用于数据框,而不是将其视为var。获取所需内容的一种方法是使用Windows的row对每个窗口分区内的行通过前一行(即row-1)累积求和col1的值:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy("id").orderBy("dates").rowsBetween(Long.MinValue, -1)

val newDF = someDf.
  withColumn(
    "col2", sum($"col1").over(window)
  ).withColumn(
    "col2", when($"col2".isNull, 0).otherwise($"col2")
  ).orderBy("id", "dates")

newDF.show
+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   1|
|2016-02-07 00:00:...|105010|   1|   1|
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   1|
|2016-02-07 00:00:...|385608|   1|   2|
|2016-02-14 00:00:...|385608|   1|   3|
+--------------------+------+----+----+
陈博容
2023-03-14

你可以使用rowsapi和你正在使用的Window函数,你应该有想要的输出

val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", sum(lag($"col1", 1).over(date_id_window)).over(date_id_window.rowsBetween(Long.MinValue, 0)))
  .withColumn("col2", when($"col2".isNull, lit(0)).otherwise($"col2"))
  .show()

给定输入dataframeas

+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   0|
|2016-02-07 00:00:...|385608|   1|   0|
|2016-02-14 00:00:...|385608|   1|   0|
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   0|
|2016-02-07 00:00:...|105010|   1|   0|
+--------------------+------+----+----+

应用上述逻辑后,您应该有输出数据帧

+--------------------+------+----+----+
|               dates|    id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010|   0|   0|
|2016-01-24 00:00:...|105010|   1|   0|
|2016-01-31 00:00:...|105010|   0|   1|
|2016-02-07 00:00:...|105010|   1|   1|
|2016-01-10 00:00:...|385608|   0|   0|
|2016-01-17 00:00:...|385608|   0|   0|
|2016-01-24 00:00:...|385608|   1|   0|
|2016-01-31 00:00:...|385608|   1|   1|
|2016-02-07 00:00:...|385608|   1|   2|
|2016-02-14 00:00:...|385608|   1|   3|
+--------------------+------+----+----+

我希望答案是有帮助的

董飞航
2023-03-14

数据集应该可以正常工作:

val x = 0.1

case class Record(dates: String, id: String, col1: Int)

someDf.drop("col2").as[Record].groupByKey(_.id).flatMapGroups((_,  records) => {
  val sorted = records.toSeq.sortBy(_.dates)
  sorted.scanLeft((null: Record, 0.0)){
    case ((_, col2), record) => (record, x * record.col1 + (1 - x) * col2)
  }.tail
}).select($"_1.*", $"_2".alias("col2"))
 类似资料:
  • 我正在做一个关于象棋游戏的项目。在对数据进行一些处理之后,我需要得到一个特定位置的芬(https://en.wikipedia.org/wiki/Forsyth–Edwards_Notation)符号。我已经写好了每块FEN编码的代码,但是我很难对代表未被占据的连续方块的数量的字符进行编码。 例如,以以下FEN代码为例: 每个1代表棋盘内一个未被占用的方块。例如:告诉我们棋盘内的这一行没有被棋子占

  • 问题内容: 我有一个看起来像这样的表: 我想添加一个新的列,称为cumulative_sum,因此表如下所示: 是否有可以轻松完成此操作的MySQL更新语句?做到这一点的最佳方法是什么? 问题答案: 如果性能是一个问题,则可以使用MySQL变量: 或者,您可以删除该列并在每个查询中对其进行计算: 这以运行方式计算运行总和:)

  • 问题内容: 对于Java语言有些陌生,我试图使自己熟悉所有可能遍历列表(或其他集合)的方式(或至少是非病理性方式)以及每种方式的优缺点。 给定一个对象,我知道以下遍历所有元素的方式: 基本的for 循环(当然,也有等效的while/ do while循环) 注意:正如@a​​marseillan指出的那样,这种形式对于在s上进行迭代是一个糟糕的选择,因为该方法的实际实现可能不如使用时有效。例如,实

  • 由于对Java语言有些陌生,我正在尝试熟悉所有可以通过列表(或者其他集合)进行迭代的方法(或者至少是非病态的方法),以及每种方法的优缺点。 给定一个对象,我知道以下循环所有元素的方法: 注意:正如@Amarseillan所指出的,对于迭代s,此表单是一个糟糕的选择,因为方法的实际实现可能不如使用时那样高效。例如,实现必须遍历i之前的所有元素以获得第i个元素。 在上面的示例中,实现没有办法“保留它的

  • 我有一个自定义模板类- 我想要一个常量迭代器开始于myClass,常量迭代器结束于myClass,它能够迭代myClass矩阵中的对象T,我正在努力创建这样的东西。 在我看来,我想把矩阵上的所有对象T聚集到某个局部一维向量,然后返回迭代器。从这个向量或迭代器开始。结束这个向量 此外,我希望能够支持for-each循环如下: 谢谢!

  • 问题内容: 我试图弄清楚如何将累积函数应用于对象。对于数字,有多种选择,例如和。还有df.expanding可以与一起使用。但是我传递给我的功能不适用于对象。 在数据框中,我具有整数值,集合,字符串和列表。现在,如果我尝试一下,我有累加的总和: 我的期望是,由于求和是在列表和字符串上定义的,所以我会得到如下信息: 我也尝试过这样的事情: 它按我的预期工作:以前的结果是,当前行的值是。但是例如,我不