当前位置: 首页 > 知识库问答 >
问题:

将UDF应用于Spark Dataframe中的多列

羊舌昆杰
2023-03-14

我有一个如下所示的数据框架

| id| age|   rbc|  bgr| dm|cad|appet| pe|ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|  3|48.0|normal|117.0| no| no| poor|yes|yes|           ckd|
....
....
....

我写了一个UDF来将分类是,否,差,正常转换为二进制01s

def stringToBinary(stringValue: String): Int = {
    stringValue match {
        case "yes" => return 1
        case "no" => return 0
        case "present" => return 1
        case "notpresent" => return 0
        case "normal" => return 1
        case "abnormal" => return 0
    }
}

val stringToBinaryUDF = udf(stringToBinary _)

我将此应用于数据帧,如下所示

val newCol = stringToBinaryUDF.apply(col("pc")) //creates the new column with formatted value
val refined1 = noZeroDF.withColumn("dm", newCol) //adds the new column to original

如何将多个列传递到 UDF 中,这样我就不必对其他分类列重复自己?

共有3个答案

车靖琪
2023-03-14

UDF可以采用许多参数,即许多列,但它应该返回一个结果,即一列。

为此,只需将参数添加到字符串ToBinary函数即可完成。

如果您希望它包含两列,它将如下所示:

def stringToBinary(stringValue: String, secondValue: String): Int = {
stringValue match {
    case "yes" => return 1
    case "no" => return 0
    case "present" => return 1
    case "notpresent" => return 0
    case "normal" => return 1
    case "abnormal" => return 0
}
}
val stringToBinaryUDF = udf(stringToBinary _)

希望这有帮助

柳翼
2023-03-14

您也可以使用折叠左转功能。将 UDF 称为字符串到二进制UDF

import org.apache.spark.sql.functions._

val categoricalColumns = Seq("rbc", "cad", "rbc", "pe", "ane")
val refinedDF = categoricalColumns
    .foldLeft(noZeroDF) { (accumulatorDF: DataFrame, columnName: String) =>
         accumulatorDF
            .withColumn(columnName, stringToBinaryUDF(col(columnName)))
     }

这将尊重不变性和函数式编程。

严俊彦
2023-03-14

如果您有 spark 函数来执行与 udf 函数序列化和反序列化列数据相同的工作,则不应选择 udf 函数。

给定一个数据帧作为

+---+----+------+-----+---+---+-----+---+---+--------------+
|id |age |rbc   |bgr  |dm |cad|appet|pe |ane|classification|
+---+----+------+-----+---+---+-----+---+---+--------------+
|3  |48.0|normal|117.0|no |no |poor |yes|yes|ckd           |
+---+----+------+-----+---+---+-----+---+---+--------------+

您可以通过< code>when函数来实现您的要求

import org.apache.spark.sql.functions._
def applyFunction(column : Column) = when(column === "yes" || column === "present" || column === "normal", lit(1))
  .otherwise(when(column === "no" || column === "notpresent" || column === "abnormal", lit(0)).otherwise(column))

df.withColumn("dm", applyFunction(col("dm")))
  .withColumn("cad", applyFunction(col("cad")))
  .withColumn("rbc", applyFunction(col("rbc")))
  .withColumn("pe", applyFunction(col("pe")))
  .withColumn("ane", applyFunction(col("ane")))
  .show(false)

结果是

+---+----+---+-----+---+---+-----+---+---+--------------+
|id |age |rbc|bgr  |dm |cad|appet|pe |ane|classification|
+---+----+---+-----+---+---+-----+---+---+--------------+
|3  |48.0|1  |117.0|0  |0  |poor |1  |1  |ckd           |
+---+----+---+-----+---+---+-----+---+---+--------------+

现在问题清楚地表明,您不想对所有列重复该过程,因此您可以执行以下操作

val columnsTomap = df.select("rbc", "cad", "rbc", "pe", "ane").columns

var tempdf = df
columnsTomap.map(column => {
  tempdf = tempdf.withColumn(column, applyFunction(col(column)))
})

tempdf.show(false)
 类似资料:
  • 我在Scala中的中转置值时遇到问题。我的初始如下所示: 和是类型

  • 我有一个模式和要应用UDF的列名称。列的名称是用户输入,每个输入的数字可能不同。有没有办法将UDF应用于数据帧中的N列? 试图实现这一点。对于具有col1、col2、col3、col4、col5的模式 有什么想法吗?

  • 我似乎在网上找不到任何例子。Cassandra 3.0支持可以用Java、Python或其他语言编写的UDF和UDA。 如何设置Python中定义的UDF或UDA(我指的是Python函数,而不是通过Python驱动程序在Java或Javascript中定义的函数)?

  • 我有一个派斯帕克数据帧 我想将其转换为与 pyspark.ml 一起使用。我可以使用字符串索引器将名称列转换为数字类别: 如何用StringIndexer(例如< code>name和< code>food,每个列都有自己的< code>StringIndexer)转换几个列,然后用VectorAssembler生成一个特征向量?还是必须为每一列创建一个< code>StringIndexer?

  • 问题内容: 该文档展示了如何使用输出列名称作为键的字典一次在groupby对象上应用多个功能: 但是,这仅适用于Series groupby对象。同样,当将字典类似地传递到groupby DataFrame时,它期望键是将应用该函数的列名。 我想做的是对多个列应用多个功能(但是某些列将被多次操作)。同样,某些函数将依赖于groupby对象中的其他列(如sumif函数)。我当前的解决方案是逐列进行操