我想知道如何在Spark(Pyspark)中实现以下目标
初始数据框:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
结果数据框:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
我通常使用以下方法设法将新列“追加”到数据框: df.withColumn("new_Col", df.num * 10)
但是,我不知道如何为新列实现这种“行移位”,以便新列具有上一行的字段值(如示例中所示)。我也无法在API文档中找到有关如何通过索引访问DF中特定行的任何内容。
任何帮助,将不胜感激。
您可以lag
如下使用窗口功能
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
但是有一些重要的问题:
尽管第二个问题几乎从来都不是问题,但第一个问题可以成为破坏交易的方法。如果是这种情况,您应该简单地将其转换DataFrame
为RDD并lag
手动进行计算。
初始数据流: 产生的数据frame: 我通常通过使用以下内容将新列“追加”到dataframe:
我有一个如下的数据帧: 我想将其转换为如下内容: B中重复的值是列名称,其值在C列中。我希望它们是数据帧的列 事实上,数据集是通过展平一棵树创建的,其中有更多的列,每个列都是一个内部节点。第一列是根,C是叶 以下是我的一些尝试: 然而,它不起作用。
问题内容: 我试图基于一个的数据创建“ n” 。我正在检查in的Integer值,并循环执行sql语句以创建与列中一样多的“ n” 。 这是我的代码: 我需要创建“ n”,但我不知道如何在循环之前声明类型并在for内填充。 现有数据类型: 新的数据类型: 问题答案: 您可以创建一个可变列表并填充它: 但是更好的方法(不使用可变数据结构)是将整数列表 映射 到DataFrames列表中:
我有一个来自excel电子表格的数据框,其中我找到了每个域出现的频率。我想添加域频率计数到它的相应域。 下面是查找频率并尝试将其添加到相应域的代码。 当我从数据帧打印出频率时:
问题内容: 我有一个现有的数据框,我需要添加一个额外的列,每行将包含相同的值。 现有的df: 新的df: 我知道如何追加现有的series / dataframe列。但这是另一种情况,因为我所需要的只是添加“名称”列,并将每一行设置为相同的值,在本例中为“ abc”。 问题答案: 将添加新列并将所有行设置为该值:
我使用的是Spark 1.6,我想在数据帧中添加一列。新列实际上是一个常量序列:Seq(“-0”、“-1”、“-2”、“-3”) 这是我的原始数据帧: root--user\u name:string(nullable=true) |--test\u name:string(nullable=true) |user_name|test_name| ------------ ------------