当前位置: 首页 > 知识库问答 >
问题:

从 Spark 数据帧中的单个列派生多个列

阎宝
2023-03-14

我有一个DF,其中包含一个巨大的可解析元数据,作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA。

我想通过一个函数,ClassXYZ = Func1(ColmnA)将ColmnA这一列分成多个列。这个函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新的列,比如ColmnA1、ColmnA2等。

我如何通过调用Func1一次来完成从一个数据帧到另一个数据帧的转换,而不需要重复它来创建所有的列。

如果我每次都调用这个巨大的函数来添加一个新列,这很容易解决,但这是我希望避免的。

请告知工作或伪代码。

谢谢

桑贾伊

共有3个答案

田志
2023-03-14

如果结果列的长度与原始列的长度相同,那么可以使用withColumn函数并通过应用udf来创建全新的列。在此之后,您可以删除您的原始专栏,例如:

 val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))

其中myFun是一个定义如下的udf:

   def myFun= udf(
    (originalColumnContent : String) =>  {
      // do something with your original column content and return a new one
    }
  )
柴茂材
2023-03-14

假设函数之后有一系列元素,给出如下示例:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
|          infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
|  jill,1989,London| 27|
+------------------+---+

现在您可以使用此infoComb做的是,您可以开始拆分字符串并获取更多列:

df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn|   city|age|
+-----+----------+-------+---+
|Mike|      1986|Toronto| 30|
|Andre|      1980| Ottawa| 36|
| jill|      1989| London| 27|
+-----+----------+-------+---+

希望这有帮助。

柴衡
2023-03-14

一般来说,你想要的不是直接可能的。UDF一次只能返回一列。有两种不同的方法可以克服此限制:

>

  • 返回复杂类型的列。最一般的解决方案是结构类型,但您也可以考虑数组类型映射类型

    import org.apache.spark.sql.functions.udf
    
    val df = Seq(
      (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
    ).toDF("x", "y", "z")
    
    case class Foobar(foo: Double, bar: Double)
    
    val foobarUdf = udf((x: Long, y: Double, z: String) => 
      Foobar(x * y, z.head.toInt * y))
    
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
    df1.show
    // +---+----+---+------------+
    // |  x|   y|  z|      foobar|
    // +---+----+---+------------+
    // |  1| 3.0|  a| [3.0,291.0]|
    // |  2|-1.0|  b|[-2.0,-98.0]|
    // |  3| 0.0|  c|   [0.0,0.0]|
    // +---+----+---+------------+
    
    df1.printSchema
    // root
    //  |-- x: long (nullable = false)
    //  |-- y: double (nullable = false)
    //  |-- z: string (nullable = true)
    //  |-- foobar: struct (nullable = true)
    //  |    |-- foo: double (nullable = false)
    //  |    |-- bar: double (nullable = false)
    

    这可以很容易地变平,但通常没有必要这样做。

    转战RDD,重塑重建DF:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
      Seq(x * y, z.head.toInt * y)
    
    val schema = StructType(df.schema.fields ++
      Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
    val rows = df.rdd.map(r => Row.fromSeq(
      r.toSeq ++
      foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
    val df2 = sqlContext.createDataFrame(rows, schema)
    
    df2.show
    // +---+----+---+----+-----+
    // |  x|   y|  z| foo|  bar|
    // +---+----+---+----+-----+
    // |  1| 3.0|  a| 3.0|291.0|
    // |  2|-1.0|  b|-2.0|-98.0|
    // |  3| 0.0|  c| 0.0|  0.0|
    // +---+----+---+----+-----+
    

  •  类似资料:
    • 问题内容: 我有一个像下面的数据框。 我需要基于多个条件派生Flag列。 我需要比较触发器1 -3列的得分和身高列。 标志栏: 如果得分大于等于触发器1并且高度小于8,则红色- 如果得分大于等于触发器2并且高度小于8,则黄色- 如果得分大于等于触发器3并且高度小于8,则橙色- 如果高度大于8,则留空 如果在pandas数据框中有其他条件并导出列,该如何写? 预期产量 对于我原始问题中的其他列Tex

    • 有没有比调用多个帧更好的方法来同时为给定的 SparkSQL 添加前缀或重命名所有或多个列? 例如,如果我想检测更改(使用完全外连接)。然后我剩下两个具有相同结构的< code >数据帧。

    • 我正在实施一个项目,其中MySql数据被导入到hdfs使用sqoop。它有将近30张桌子。我通过推断模式和注册为临时表来读取每个表作为数据帧。我做这件事有几个问题...1.假设df1到df10的表需要实现几个连接。在MySQL中,查询将是而不是使用是否有其他连接所有数据帧有效地基于条件...

    • 我有一个包含大量列的Spark数据框架。我想从中删除两列以获得新的数据帧。 如果列更少,我可以在API中使用select方法,如下所示: 但是既然从长列表中挑选列是一项乏味的任务,有解决方法吗?

    • 我有两个不同列数和行数的CSV文件。第一个CSV文件有M列和N行,第二个文件有H列和G行。一些列具有相同的名称。 null 另外,如果两个CSV文件有两个数据帧,并希望这样做,例如,如果我将第一个CSV加载到中,将第二个加载到中,然后希望合并到,类似于上面的示例。

    • 我在spark dataframe中有一个包含文本的列。 我想提取所有以特殊字符开头的单词,我正在使用从该文本列的每一行中提取。如果文本包含以开头的多个单词,则只返回第一个单词。 我正在寻找提取多个符合我在火花模式的单词。 样本输入:< code>@always_nidhi @YouTube不,我不明白,但我喜欢他们的音乐和舞蹈真棒这首mve的所有歌曲都很摇滚 示例输出: