当前位置: 首页 > 面试题库 >

Spark使用上一行的值将新列添加到数据框

吉俊德
2023-03-14
问题内容

我想知道如何在Spark(Pyspark)中实现以下目标

初始数据框:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

结果数据框:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

我通常使用以下方法设法将新列“追加”到数据框: df.withColumn("new_Col", df.num * 10)

但是,我不知道如何为新列实现这种“行移位”,以便新列具有上一行的字段值(如示例中所示)。我也无法在API文档中找到有关如何通过索引访问DF中特定行的任何内容。

任何帮助,将不胜感激。


问题答案:

您可以lag如下使用窗口功能

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

但是有一些重要的问题:

  1. 如果您需要全局操作(不被其他一个或多个其他列分区),则效率极低。
  2. 您需要一种自然的方式来订购数据。

尽管第二个问题几乎从来都不是问题,但第一个问题可以成为破坏交易的方法。如果是这种情况,您应该简单地将其转换DataFrame为RDD并lag手动进行计算。



 类似资料:
  • 初始数据流: 产生的数据frame: 我通常通过使用以下内容将新列“追加”到dataframe:

  • 我有一个如下的数据帧: 我想将其转换为如下内容: B中重复的值是列名称,其值在C列中。我希望它们是数据帧的列 事实上,数据集是通过展平一棵树创建的,其中有更多的列,每个列都是一个内部节点。第一列是根,C是叶 以下是我的一些尝试: 然而,它不起作用。

  • 问题内容: 我试图基于一个的数据创建“ n” 。我正在检查in的Integer值,并循环执行sql语句以创建与列中一样多的“ n” 。 这是我的代码: 我需要创建“ n”,但我不知道如何在循环之前声明类型并在for内填充。 现有数据类型: 新的数据类型: 问题答案: 您可以创建一个可变列表并填充它: 但是更好的方法(不使用可变数据结构)是将整数列表 映射 到DataFrames列表中:

  • 我有一个来自excel电子表格的数据框,其中我找到了每个域出现的频率。我想添加域频率计数到它的相应域。 下面是查找频率并尝试将其添加到相应域的代码。 当我从数据帧打印出频率时:

  • 问题内容: 我有一个现有的数据框,我需要添加一个额外的列,每行将包含相同的值。 现有的df: 新的df: 我知道如何追加现有的series / dataframe列。但这是另一种情况,因为我所需要的只是添加“名称”列,并将每一行设置为相同的值,在本例中为“ abc”。 问题答案: 将添加新列并将所有行设置为该值:

  • 我使用的是Spark 1.6,我想在数据帧中添加一列。新列实际上是一个常量序列:Seq(“-0”、“-1”、“-2”、“-3”) 这是我的原始数据帧: root--user\u name:string(nullable=true) |--test\u name:string(nullable=true) |user_name|test_name| ------------ ------------