当前位置: 首页 > 知识库问答 >
问题:

火花UDF过载

益和雅
2023-03-14

我有一个要求,火花UDF必须超载,我知道UDF超载是不支持火花。因此,为了克服spark的这一限制,我尝试创建一个接受任何类型的UDF,它在UDF中找到实际的数据类型,并调用相应的方法进行计算并相应地返回值。这样做时,我得到一个错误

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Any is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:720)
    at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:213)
    at com.experian.spark_jobs.Test$.main(Test.scala:9)
    at com.experian.spark_jobs.Test.main(Test.scala)

以下是示例代码:

import org.apache.spark.sql.SparkSession

object Test {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
    spark.udf.register("testudf", testudf _)
    spark.sql("create temporary view testView as select testudf(1, 2) as a").show()
    spark.sql("select testudf(a, 5) from testView").show()

  }

  def testudf(a: Any, b: Any) = {
    if (a.isInstanceOf[Integer] && b.isInstanceOf[Integer]) {
      add(a.asInstanceOf[Integer], b.asInstanceOf[Integer])
    } else if (a.isInstanceOf[java.math.BigDecimal] && b.isInstanceOf[java.math.BigDecimal]) {
      add(a.asInstanceOf[java.math.BigDecimal], b.asInstanceOf[java.math.BigDecimal])
    }
  }

  def add(decimal: java.math.BigDecimal, decimal1: java.math.BigDecimal): java.math.BigDecimal = {
    decimal.add(decimal1)
  }

  def add(integer: Integer, integer1: Integer): Integer = {
    integer + integer1
  }
}

有可能使上述要求成为可能吗?如果没有,请建议我一个更好的方法。

注:Spark版本-2.4.0

共有1个答案

宗政霄
2023-03-14

使用Dataframe(未类型化)的问题是在编译时执行某种多态之类的操作非常痛苦。理想情况下,拥有列类型将允许使用特定的“添加函数”实现构建您的udfs,就像您使用Monides一样。但是Spark Dataframe API离这个世界非常遥远。使用数据集或无框架帮助很大。

在您的示例中,要在运行时检查类型,您将需要AnyRef而不是Any。那应该有用。

 类似资料:
  • 我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成: 数据帧()具有以下架构: 例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查、或是否。 第一种方法: 如果<code>struct是否为空,因为在scala中<code>浮点不能为空。 第二种方法: 这种方法,我可以在我的udf中检查是否为空,但我可以检查

  • 我在火花数据帧中有一个“结构类型”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理它吗?或者有什么替代方案? 似乎我需要行类型的UDF,类似 这是有意义的,因为Spark不知道返回类型的模式。不幸的是,udf.register也失败了:

  • 我目前正在使用上面的UDF将一列字符串解析成一个键和值的数组。“50:63.25,100:58.38”到[[50,63.2],[100,58.38]]。在某些情况下,字符串是“\N”,我无法解析列值。如果字符串是“\N”,那么我应该返回一个空数组。有人能帮我处理这个异常或帮我添加一个新的案例吗?我是spark-scala的新手。 错误:scala.MatchError:[Ljava.lang.St

  • 这个函数的作用是将字符串列解析为键和值的数组。""to。这是我的UDF,它创建了一个包装的int和Double结构元素数组。 有些情况下,输入字符串的格式不正确,我会得到一个错误:输入字符串的< code > Java . lang . numberformatexception :因为“< code>k.trim.toInt”无法转换像“< code>.01-4.1293”这样的脏数据,这是一个

  • 想象一下下面的代码: 如何定义myUdf的返回类型,以便查看代码的人立即知道它返回了一个Double?

  • 我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一