当前位置: 首页 > 知识库问答 >
问题:

Spark:将UDF应用于Dataframe根据DF中的值生成新列

柯唯
2023-03-14

我在Scala中的DataFrame中转置值时遇到问题。我的初始DataFrame如下所示:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   A|   X|   6|null|
|   B|   Z|null|   5|
|   C|   Y|   4|null|
+----+----+----+----+

col1col2是类型字符串col3,和col4Int

结果应该是这样的:

+----+----+----+----+------+------+------+
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4|
+----+----+----+----+------+------+------+
|   A|   X|   6|null|     6|  null|  null|
|   B|   Z|null|   5|  null|     5|  null|
|   C|   Y|   4|   4|  null|  null|     4|
+----+----+----+----+------+------+------+

这意味着三个新列应以col1col2和提取值的列命名。提取的值来自列col2col3col5取决于哪个值不是null

那么如何实现这一点呢?我首先想到的是这样一个< code>UDF:

def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = {
    if col3 == null{
        val rowValue=col4;
        val newColumn=col1+col2+"col4";
    } else{
        val rowValue=col3;
        val newColumn=col1+col2+"col3";
     }
    return (newColumn, rowValue);
}

val udfMyFunc = udf(myFunc _ ) //needed to treat it as partially applied function

但我如何从数据帧以正确的方式调用它?

当然,上面的所有代码都是垃圾,还有更好的方法。因为我只是在处理第一个代码片段,所以让我知道…比较<code>Int已经不起作用了。

感谢任何帮助!谢谢!


共有2个答案

李耀
2023-03-14

好的,我有一个解决方法来实现我想要的东西。我执行以下操作:

(1)我使用< code>[newColumnName,rowValue]生成包含元组的新列。按照这个建议,从Spark数据帧中的单个列派生多个列

case class toTuple(newColumnName: String, rowValue: String)

def createTuple (input1:String, input2:String) : toTuple = {
    //do something fancy here
    var column:String= input1 + input2
    var value:String= input1        
    return toTuple(column, value)
}

val UdfCreateTuple = udf(createTuple _)

(2) 将函数应用于数据帧

dfNew= df.select($"*", UdfCreateTuple($"col1",$"col2").alias("tmpCol")

(3) 创建具有不同值 new列名的数组

val dfDistinct = dfNew.select($"tmpCol.newColumnName").distinct

(4)创建具有不同值的数组

var a = dfDistinct.select($"newCol").rdd.map(r => r(0).asInstanceOf[String])

var arrDistinct = a.map(a => a).collect()

(5)创建键值映射

var seqMapping:Seq[(String,String)]=Seq()
for (i <- arrDistinct){
    seqMapping :+= (i,i)
}

(6) 将映射应用于原始数据帧,参见基于另一列将值映射到特定列

val exprsDistinct = seqMapping.map { case (key, target) => 
  when($"tmpCol.newColumnName" === key, $"tmpCol.rowValue").alias(target) }

val dfFinal = dfNew.select($"*" +: exprsDistinct: _*)

好吧,这有点麻烦,但我可以在不知道有多少列的情况下导出一组新列,同时将值转移到新列中。

柯默
2023-03-14
匿名用户

有一种更简单的方法

val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1
          .withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2
          .groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3
          .pivot("newCol") // Step 4
          .agg(max($"value")) // Step 5
          .orderBy($"newCol") // Step 6
          .drop($"newCol") // Step 7

      df3.show()

步骤如下:

    < li >添加一个新列,其中包含与col2连接的col1的内容 < li>//添加一个新列“value ”,它包含col3或col4的非空内容 < li >按所需的列分组 < li >透视newCol,它包含现在将成为列标题的值 < li >按最大值聚合,如果groupBy是每组单值,则最大值为值本身;或者<代码>。agg(first($ " value ")如果值恰好是字符串而不是数值类型- max函数只能应用于数值类型 < li>order by newCol,因此DF按升序排列 < li >删除此列,因为您不再需要它,或者如果您想要一列没有空值的值,请跳过此步骤

感谢@user8371915,他首先帮助我回答了自己的支点问题。

结果如下:

+----+----+----+----+----+----+----+
|col1|col2|col3|col4|  AX|  BZ|  CY|
+----+----+----+----+----+----+----+
|   A|   X|   6|null|   6|null|null|
|   B|   Z|null|   5|null|   5|null|
|   C|   Y|   4|   4|null|null|   4|
+----+----+----+----+----+----+----+

您可能需要尝试将列标题字符串串联起来,以获得正确的结果。

 类似资料:
  • 我有一个如下所示的数据框架 我写了一个UDF来将分类转换为二进制和 我将此应用于数据帧,如下所示 如何将多个列传递到 UDF 中,这样我就不必对其他分类列重复自己?

  • 现在,我想在一个函数中使用这个,如下所示- 然后使用此函数在我的DataFrame中创建一个新列 总之,我希望我的列“new_col”是一个类型数组,其值为[[x,x,x]] 我得到以下错误。我在这里做错了什么? 原因:java.lang.UnsupportedOperationException:不支持org.apache.spark.sql.Column类型的模式

  • 我想根据Scala映射所表示的配置来转换数据帧中的一些列。 我有 2 个案例: < li >接收映射< code>Map[String,Seq[String]]和列col1、col2,以转换col3(如果在key = col1的映射中存在实体,并且col2在此实体值列表中)。 < li >接收map 和col1,col2,以在Map中存在具有key = col1的实体并且col2在由Long元组描

  • 您可以从以下链接下载生成的.json示例:https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json? 特别是,您可以看到其中每一个中的实际数据实际上都是一个字典:我们感兴趣的“features”列的形式如下:

  • 我正在使用pyspark,用spark-csv将一个大型csv文件加载到dataframe中,作为预处理步骤,我需要对其中一列(包含json字符串)中的可用数据应用各种操作。它将返回X个值,每个值都需要存储在它们自己单独的列中。 该功能将在UDF中实现。但是,我不确定如何从该UDF返回一个值列表,并将其输入到各个列中。下面是一个简单的例子: 生成以下内容: