当前位置: 首页 > 知识库问答 >
问题:

如何在Java/Kotlin中创建一个返回复杂类型的Spark UDF?

东门仲卿
2023-03-14
private val toPrice = UDF1<String, Map<String, String>> { s ->
    val elements = s.split(" ")
    mapOf("value" to elements[0], "currency" to elements[1])
}


val type = DataTypes.createStructType(listOf(
        DataTypes.createStructField("value", DataTypes.StringType, false),
        DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)

但任何时候我用这个:

df = df.withColumn("price", callUDF("toPrice", col("price")))

我得到一个隐秘的错误:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct<value:string,currency:string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
    ... 19 more

我尝试使用自定义数据类型:

class Price(val value: Double, val currency: String) : Serializable

使用返回该类型的UDF:

private val toPrice = UDF1<String, Price> { s ->
    val elements = s.split(" ")
    Price(elements[0].toDouble(), elements[1])
}

但随后我得到另一个matcherror,它抱怨price类型。

如何正确地编写可以返回复杂类型的UDF?

共有1个答案

阮疏珂
2023-03-14

该函数应该返回一个org.apache.spark.sql.row类的对象。

Spark提供了UDF定义的两个主要变体。

> 使用Scala反射的

  • UDF变体:

      null

    定义

    Scala闭包...参数为用户定义函数(UDF)。数据类型是根据Scala闭包的签名自动推断的。

    这些变体不使用原子或代数数据类型的模式。例如,所讨论的函数将在Scala中定义:

    case class Price(value: Double, currency: String) 
    
    val df = Seq("1 USD").toDF("price")
    
    val toPrice = udf((s: String) => scala.util.Try { 
      s split(" ") match {
        case Array(price, currency) => Price(price.toDouble, currency)
      }
    }.toOption)
    
    df.select(toPrice($"price")).show
    // +----------+
    // |UDF(price)|
    // +----------+
    // |[1.0, USD]|
    // +----------+
    

    复杂结构(structs/structTypes)使用org.apache.spark.sql.row表示。不允许与代数数据类型或等价物混合。例如(Scala代码)

    struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
    

    应表示为

    Row(1, Row("foo", Row(-1.0, 42))))
    

    不是

    (1, ("foo", (-1.0, 42))))
    
    Row(1, Row("foo", (-1.0, 42))))
    

    提供此变体主要是为了确保Java互操作性。

    在这种情况下(相当于所述情况),定义应类似于以下情况:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.Row
    
    
    val schema = StructType(Seq(
      StructField("value", DoubleType, false),
      StructField("currency", StringType, false)
    ))
    
    val toPrice = udf((s: String) => scala.util.Try { 
      s split(" ") match {
        case Array(price, currency) => Row(price.toDouble, currency)
      }
    }.getOrElse(null), schema)
    
    df.select(toPrice($"price")).show
    // +----------+
    // |UDF(price)|
    // +----------+
    // |[1.0, USD]|
    // |      null|
    // +----------+
    
    UserDefinedFunction price = udf((String s) -> {
        String[] split = s.split(" ");
        return RowFactory.create(Double.parseDouble(split[0]), split[1]);
    }, DataTypes.createStructType(new StructField[]{
        DataTypes.createStructField("value", DataTypes.DoubleType, true),
        DataTypes.createStructField("currency", DataTypes.StringType, true)
    }));
    
    def createDataFrame(rows: List[Row], schema: StructType): DataFrame 
    

    或者使用带有产品序列的反射

    def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame 
    

    但不支持混合变体。

    换句话说,您应该提供可以使用rowencoder进行编码的输入。

    import org.apache.spark.sql.functions._
    
    df.withColumn("price", struct(
      split($"price", " ")(0).cast("double").alias("price"),
      split($"price", " ")(1).alias("currency")
    ))
    
      null

  •  类似资料:
    • 是否可以自动创建返回的setter? 尝试了以下操作,但这样它不起作用,但是这个例子显示了我想要实现的目标: 手动解决方案 编写coure的<code>setter</code>和<code>getter</code>我自己,如下所示: 问题 这个过程可以用kotlin自动化吗?有什么办法可以做到这一点吗?

    • 如何使用Spark Java中的StructType为以下数据定义数据类型? 字段包括:姓名、地址、ID、REPORTCARD 我有以下代码: 现在,我需要使用以下行将javaRDD转换为数据帧(数据集df): 我需要为此创建StructType架构。如何在Spark Java中定义它。 我创建了以下StructType模式: 但我得到了以下例外:

    • 我有两个bean类--乡村和城市。我需要在乡村班保留城市名单。另外,当我设置城市信息时,我需要设置国家名称,所以在城市类中也需要国家。怎么做?以下是代码: country.java

    • 现在我希望以泛型的方式使用这些类。 如何从方法“method1”和“method2”返回泛型类型(可以是猫或狗)。我有几个返回“T extends Animal”的方法,所以最好在方法级别或类级别中声明泛型类型。

    • 问题内容: 谁能帮助我在JAVA中找到方法的返回类型。我试过了 但不幸的是,它不起作用。请指导我。 输出: 不是int :: int 问题答案: 方法返回 你可以试试:

    • 问题内容: 假设我有此代码 我怎样才能使“别名”类型 到更容易重写的东西 基本上,我的问题是,如何为某些“类型”创建“别名”,因此我可以使其更容易编写,并且在需要更改整个程序代码时也更加容易。 谢谢,如果这是一个愚蠢的问题,对不起。我是Java的新手。 问题答案: Java中没有别名。您可以像这样用您的班级扩展班级: 但是请记住,这将是一个类,与您键入的将是不同的