嗨,我是Spark/Scala的新手,我一直在尝试-AKA失败,根据特定的递归公式在火花数据帧中创建一列:
这里是伪代码。
someDf.col2[0] = 0
for i > 0
someDf.col2[i] = x * someDf.col1[i-1] + (1-x) * someDf.col2[i-1]
为了深入了解更多细节,这里是我的出发点:这个数据帧是在日期
和个人id
级别上聚合的结果。
所有进一步的计算都必须针对特定的id
,并且必须考虑到前一周发生的事情。
为了说明这一点,我将这些值简化为0和1,删除了乘法器x
和1-x
,并将col2
初始化为零。
var someDf = Seq(("2016-01-10 00:00:00.0","385608",0,0),
("2016-01-17 00:00:00.0","385608",0,0),
("2016-01-24 00:00:00.0","385608",1,0),
("2016-01-31 00:00:00.0","385608",1,0),
("2016-02-07 00:00:00.0","385608",1,0),
("2016-02-14 00:00:00.0","385608",1,0),
("2016-01-17 00:00:00.0","105010",0,0),
("2016-01-24 00:00:00.0","105010",1,0),
("2016-01-31 00:00:00.0","105010",0,0),
("2016-02-07 00:00:00.0","105010",1,0)
).toDF("dates", "id", "col1","col2" )
someDf.show()
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 0|
|2016-02-07 00:00:...|385608| 1| 0|
|2016-02-14 00:00:...|385608| 1| 0|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 0|
|2016-02-07 00:00:...|105010| 1| 0|
+--------------------+------+----+----+
到目前为止我所尝试的与所期望的
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", lag($"col1",1 ).over(date_id_window) +
lag($"col2",1 ).over(date_id_window) ).show()
+--------------------+------+----+----+ / +--------------------+
| dates| id|col1|col2| / | what_col2_should_be|
+--------------------+------+----+----+ / +--------------------+
|2016-01-17 00:00:...|105010| 0|null| / | 0|
|2016-01-24 00:00:...|105010| 1| 0| / | 0|
|2016-01-31 00:00:...|105010| 0| 1| / | 1|
|2016-02-07 00:00:...|105010| 1| 0| / | 1|
+-------------------------------------+ / +--------------------+
|2016-01-10 00:00:...|385608| 0|null| / | 0|
|2016-01-17 00:00:...|385608| 0| 0| / | 0|
|2016-01-24 00:00:...|385608| 1| 0| / | 0|
|2016-01-31 00:00:...|385608| 1| 1| / | 1|
|2016-02-07 00:00:...|385608| 1| 1| / | 2|
|2016-02-14 00:00:...|385608| 1| 1| / | 3|
+--------------------+------+----+----+ / +--------------------+
有没有办法做到这一点与火花数据帧,我见过多个累积类型计算,但从来没有包括相同的列,我相信问题是,行i-1的新计算值没有被考虑,而是使用旧的i-1总是0。
任何帮助都将不胜感激。
您应该将转换应用于数据框,而不是将其视为var
。获取所需内容的一种方法是使用Windows的row
对每个窗口分区内的行通过前一行(即row-1
)累积求和col1
的值:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("id").orderBy("dates").rowsBetween(Long.MinValue, -1)
val newDF = someDf.
withColumn(
"col2", sum($"col1").over(window)
).withColumn(
"col2", when($"col2".isNull, 0).otherwise($"col2")
).orderBy("id", "dates")
newDF.show
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 1|
|2016-02-07 00:00:...|105010| 1| 1|
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 1|
|2016-02-07 00:00:...|385608| 1| 2|
|2016-02-14 00:00:...|385608| 1| 3|
+--------------------+------+----+----+
你可以使用rows
api和你正在使用的Window
函数,你应该有想要的输出
val date_id_window = Window.partitionBy("id").orderBy(asc("dates"))
someDf.withColumn("col2", sum(lag($"col1", 1).over(date_id_window)).over(date_id_window.rowsBetween(Long.MinValue, 0)))
.withColumn("col2", when($"col2".isNull, lit(0)).otherwise($"col2"))
.show()
给定输入dataframe
as
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 0|
|2016-02-07 00:00:...|385608| 1| 0|
|2016-02-14 00:00:...|385608| 1| 0|
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 0|
|2016-02-07 00:00:...|105010| 1| 0|
+--------------------+------+----+----+
应用上述逻辑后,您应该有输出数据帧
+--------------------+------+----+----+
| dates| id|col1|col2|
+--------------------+------+----+----+
|2016-01-17 00:00:...|105010| 0| 0|
|2016-01-24 00:00:...|105010| 1| 0|
|2016-01-31 00:00:...|105010| 0| 1|
|2016-02-07 00:00:...|105010| 1| 1|
|2016-01-10 00:00:...|385608| 0| 0|
|2016-01-17 00:00:...|385608| 0| 0|
|2016-01-24 00:00:...|385608| 1| 0|
|2016-01-31 00:00:...|385608| 1| 1|
|2016-02-07 00:00:...|385608| 1| 2|
|2016-02-14 00:00:...|385608| 1| 3|
+--------------------+------+----+----+
我希望答案是有帮助的
数据集
应该可以正常工作:
val x = 0.1
case class Record(dates: String, id: String, col1: Int)
someDf.drop("col2").as[Record].groupByKey(_.id).flatMapGroups((_, records) => {
val sorted = records.toSeq.sortBy(_.dates)
sorted.scanLeft((null: Record, 0.0)){
case ((_, col2), record) => (record, x * record.col1 + (1 - x) * col2)
}.tail
}).select($"_1.*", $"_2".alias("col2"))
我正在做一个关于象棋游戏的项目。在对数据进行一些处理之后,我需要得到一个特定位置的芬(https://en.wikipedia.org/wiki/Forsyth–Edwards_Notation)符号。我已经写好了每块FEN编码的代码,但是我很难对代表未被占据的连续方块的数量的字符进行编码。 例如,以以下FEN代码为例: 每个1代表棋盘内一个未被占用的方块。例如:告诉我们棋盘内的这一行没有被棋子占
问题内容: 我有一个看起来像这样的表: 我想添加一个新的列,称为cumulative_sum,因此表如下所示: 是否有可以轻松完成此操作的MySQL更新语句?做到这一点的最佳方法是什么? 问题答案: 如果性能是一个问题,则可以使用MySQL变量: 或者,您可以删除该列并在每个查询中对其进行计算: 这以运行方式计算运行总和:)
问题内容: 对于Java语言有些陌生,我试图使自己熟悉所有可能遍历列表(或其他集合)的方式(或至少是非病理性方式)以及每种方式的优缺点。 给定一个对象,我知道以下遍历所有元素的方式: 基本的for 循环(当然,也有等效的while/ do while循环) 注意:正如@amarseillan指出的那样,这种形式对于在s上进行迭代是一个糟糕的选择,因为该方法的实际实现可能不如使用时有效。例如,实
由于对Java语言有些陌生,我正在尝试熟悉所有可以通过列表(或者其他集合)进行迭代的方法(或者至少是非病态的方法),以及每种方法的优缺点。 给定一个对象,我知道以下循环所有元素的方法: 注意:正如@Amarseillan所指出的,对于迭代s,此表单是一个糟糕的选择,因为方法的实际实现可能不如使用时那样高效。例如,实现必须遍历i之前的所有元素以获得第i个元素。 在上面的示例中,实现没有办法“保留它的
我有一个自定义模板类- 我想要一个常量迭代器开始于myClass,常量迭代器结束于myClass,它能够迭代myClass矩阵中的对象T,我正在努力创建这样的东西。 在我看来,我想把矩阵上的所有对象T聚集到某个局部一维向量,然后返回迭代器。从这个向量或迭代器开始。结束这个向量 此外,我希望能够支持for-each循环如下: 谢谢!
问题内容: 我试图弄清楚如何将累积函数应用于对象。对于数字,有多种选择,例如和。还有df.expanding可以与一起使用。但是我传递给我的功能不适用于对象。 在数据框中,我具有整数值,集合,字符串和列表。现在,如果我尝试一下,我有累加的总和: 我的期望是,由于求和是在列表和字符串上定义的,所以我会得到如下信息: 我也尝试过这样的事情: 它按我的预期工作:以前的结果是,当前行的值是。但是例如,我不