当前位置: 首页 > 知识库问答 >
问题:

如何根据先前记录的值更新spark数据框的列

谢财
2023-03-14

我在 df 中有三列

Col1,col2,col3

X,x1,x2

Z,z1,z2

Y,

X,x3,x4

P,p1,p2

Q,q1,q2

Y

我想在col1=x时执行以下操作,存储col2和col3的值,并在col1=y预期输出时将这些列值分配给下一行

X,x1,x2

Z,z1,z2

Y,x1,x2

X,x3,x4

P,p1,p2

Q,q1,q2

Y,x3,x4

任何帮助将不胜感激 注意:-火花 1.6

共有2个答案

冯和硕
2023-03-14

是的,有一个需要排序的滞后函数

import org.apache.spark.sql.expressions.Window.orderBy
import org.apache.spark.sql.functions.{coalesce, lag}

case class Temp(a: String, b: Option[String], c: Option[String])

val input = ss.createDataFrame(
  Seq(
    Temp("A", Some("a1"), Some("a2")),
    Temp("D", Some("d1"), Some("d2")),
    Temp("B", Some("b1"), Some("b2")),
    Temp("E", None, None),
    Temp("C", None, None)
  ))

+---+----+----+
|  a|   b|   c|
+---+----+----+
|  A|  a1|  a2|
|  D|  d1|  d2|
|  B|  b1|  b2|
|  E|null|null|
|  C|null|null|
+---+----+----+

val order = orderBy($"a")
input
  .withColumn("b", coalesce($"b", lag($"b", 1).over(order)))
  .withColumn("c", coalesce($"c", lag($"c", 1).over(order)))
  .show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  A| a1| a2|
|  B| b1| b2|
|  C| b1| b2|
|  D| d1| d2|
|  E| d1| d2|
+---+---+---+
慕容文昌
2023-03-14

这是一种使用Windows函数的方法,步骤如下:

    < li >添加行标识列(如果已经有一列,则不需要)并将非键列(可能是许多列)合并为一列 < li >使用条件空值创建< code>tmp1,使用< code>last/rowsBetween窗口函数使用最后一个非空值回填创建< code>tmp2 < li >根据< code>cols和< code>tmp2有条件地创建< code>newcols < li >使用< code>foldLeft将< code>newcols扩展回各个列

请注意,此解决方案使用窗口函数而不进行分区,因此可能不适用于大型数据集。

val df = Seq(
  ("X", "x1", "x2"),
  ("Z", "z1", "z2"),
  ("Y", "", ""),
  ("X", "x3", "x4"),
  ("P", "p1", "p2"),
  ("Q", "q1", "q2"),
  ("Y", "", "")
).toDF("col1", "col2", "col3")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val colList = df.columns.filter(_ != "col1")

val df2 = df.select($"col1", monotonically_increasing_id.as("id"),
  struct(colList.map(col): _*).as("cols")
)

val df3 = df2.
  withColumn( "tmp1", when($"col1" === "X", $"cols") ).
  withColumn( "tmp2", last("tmp1", ignoreNulls = true).over(
    Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)
  ) )

df3.show
// +----+---+-------+-------+-------+
// |col1| id|   cols|   tmp1|   tmp2|
// +----+---+-------+-------+-------+
// |   X|  0|[x1,x2]|[x1,x2]|[x1,x2]|
// |   Z|  1|[z1,z2]|   null|[x1,x2]|
// |   Y|  2|    [,]|   null|[x1,x2]|
// |   X|  3|[x3,x4]|[x3,x4]|[x3,x4]|
// |   P|  4|[p1,p2]|   null|[x3,x4]|
// |   Q|  5|[q1,q2]|   null|[x3,x4]|
// |   Y|  6|    [,]|   null|[x3,x4]|
// +----+---+-------+-------+-------+

val df4 = df3.withColumn( "newcols",
  when($"col1" === "Y", $"tmp2").otherwise($"cols")
).select($"col1", $"newcols")

df4.show
// +----+-------+
// |col1|newcols|
// +----+-------+
// |   X|[x1,x2]|
// |   Z|[z1,z2]|
// |   Y|[x1,x2]|
// |   X|[x3,x4]|
// |   P|[p1,p2]|
// |   Q|[q1,q2]|
// |   Y|[x3,x4]|
// +----+-------+

val dfResult = colList.foldLeft( df4 )(
  (accDF, c) => accDF.withColumn(c, df4(s"newcols.$c"))
).drop($"newcols")

dfResult.show
// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |   X|  x1|  x2|
// |   Z|  z1|  z2|
// |   Y|  x1|  x2|
// |   X|  x3|  x4|
// |   P|  p1|  p2|
// |   Q|  q1|  q2|
// |   Y|  x3|  x4|
// +----+----+----+

[更新]

对于Spark 1.x,< code>last(colName,ignoreNulls)在DataFrame API中不可用。一个解决方法是恢复使用Spark SQL,该SQL在其last()方法中支持ignore-null:

df2.
  withColumn( "tmp1", when($"col1" === "X", $"cols") ).
  createOrReplaceTempView("df2table")
  // might need to use registerTempTable("df2table") instead

val df3 = spark.sqlContext.sql("""
  select col1, id, cols, tmp1, last(tmp1, true) over (
    order by id rows between unbounded preceding and current row
    ) as tmp2
  from df2table
""")
 类似资料:
  • 我有两个火花数据集,其中一个列的帐户和键,键列在数组的格式[key1, key2, key3...]和另一个数据集的两个列的帐户和键值是在json.帐户,{key:值,键,值...}。我需要更新第二个数据集中的值,如果键出现在第一个数据集中。 预期产出

  • 问题内容: 查看新的spark数据框api,尚不清楚是否可以修改数据框列。 我怎么会去改变行的值列一个数据帧的? 在这将是 编辑:合并以下内容,您不能修改现有数据框,因为它是不可变的,但是您可以返回具有所需修改的新数据框。 如果您只想根据条件替换列中的值,例如: 如果要对列执行某些操作并创建一个添加到数据框的新列: 如果希望新列的名称与旧列的名称相同,则可以添加其他步骤: 问题答案: 虽然您不能这

  • 如何使用Pandas更新/组合/合并数据帧(df1)和来自另一个数据帧(df2)的值,其中df1有一个新列(col3)和来自df2的值。可乐2?换句话说,df1是当前月份的值,我希望df1也有一个来自df2的列,它是上个月的值。 任何关于这方面的见解都是值得赞赏的;非常感谢你。 DF1: DF2: 所需df:

  • 问题内容: 我有两个Spark数据框: 数据框A: 和数据框B: 数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。 我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说,并可以改变值(可更新),但是是唯一的。我创建了一个哈希函数为: 现在,我想编写一些火花代码,基本上从B中

  • 问题内容: 首先,这个问题的标题太可怕了,但是我没有找到更好的方式来描述我的问题。 可能有一种非常简单的方法来执行此操作,但我无法弄清楚。这与这个问题非常相似,但是我在sqlite3(iOS)上运行,因此我怀疑我的选择受到更多限制。 我有一张带有产品记录的表格。所有记录都有一个ID(请注意:我不是在说行ID,而是每个产品唯一的标识号)。某些产品在表中可能有两个条目(两者都具有相同的ID)。唯一的区

  • 我正在寻找一个整洁的解决以下问题。 我有以下示例数据集: 我需要根据列2和列1中的前一个值填写NAs。如果我从第一个NA(第1列,第2行)开始,并且第2列大于1,那么我将把第1列的值(第1行)乘以第2列的值。那么如果col2小于等于1,那么col1正好等于前面的值。 这应该是最终结果: 我尝试了但没有得到想要的结果: 显然,我没有捕获我想要使用前一行的值的部分。任何帮助都将不胜感激。