当前位置: 首页 > 知识库问答 >
问题:

火花UDF零处理

通鸿风
2023-03-14

我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成:

数据帧()具有以下架构:

root
 |-- point: struct (nullable = true)
 |    |-- x: float (nullable = true)
 |    |-- y: float (nullable = true)

例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查pointxy是否null

第一种方法:

val sum = udf((x:Float,y:Float) => x+y)

points.withColumn("sum",sum($"point.x",$"point.y"))

如果<code>struct是否为空,因为在scala中<code>浮点不能为空。

第二种方法:

val sum = udf((pt:Row) => pt.getFloat(0)+pt.getFloat(1))
points.withColumn("sum",sum($"point"))

这种方法,我可以在我的udf中检查pt是否为空,但我可以检查xy,因为Floats不能为空。在这种情况下,我得到了NullPointerException

我怎样才能写一个udf win来检查结构以及x和y是否为空?

我用的是spark 1.6.1

更新:与这个问题相反,我处理的是浮点数而不是字符串(字符串在scala中可以为空,浮点数不能)

共有1个答案

江嘉悦
2023-03-14

您可以使用<code>行。isNullAt(i)检查ith字段是否为空。在您的情况下,您应该将udf编写为,

sum = udf((point: Row) => point.match {
  case p if (p.isNullAt(0) && p.isNullAt(0)) => 0f
  case p if p.isNullAt(0) => p.getFloat(1)
  case p if p.isNullAt(1) => p.getFloat(0)
  case p => p.getFloat(0) + p.getFloat(1)
})
 类似资料:
  • 我有一个要求,火花UDF必须超载,我知道UDF超载是不支持火花。因此,为了克服spark的这一限制,我尝试创建一个接受任何类型的UDF,它在UDF中找到实际的数据类型,并调用相应的方法进行计算并相应地返回值。这样做时,我得到一个错误 以下是示例代码: 有可能使上述要求成为可能吗?如果没有,请建议我一个更好的方法。 注:Spark版本-2.4.0

  • 我目前正在使用上面的UDF将一列字符串解析成一个键和值的数组。“50:63.25,100:58.38”到[[50,63.2],[100,58.38]]。在某些情况下,字符串是“\N”,我无法解析列值。如果字符串是“\N”,那么我应该返回一个空数组。有人能帮我处理这个异常或帮我添加一个新的案例吗?我是spark-scala的新手。 错误:scala.MatchError:[Ljava.lang.St

  • 这个函数的作用是将字符串列解析为键和值的数组。""to。这是我的UDF,它创建了一个包装的int和Double结构元素数组。 有些情况下,输入字符串的格式不正确,我会得到一个错误:输入字符串的< code > Java . lang . numberformatexception :因为“< code>k.trim.toInt”无法转换像“< code>.01-4.1293”这样的脏数据,这是一个

  • 我在火花数据帧中有一个“结构类型”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理它吗?或者有什么替代方案? 似乎我需要行类型的UDF,类似 这是有意义的,因为Spark不知道返回类型的模式。不幸的是,udf.register也失败了:

  • 想象一下下面的代码: 如何定义myUdf的返回类型,以便查看代码的人立即知道它返回了一个Double?

  • 我有一个用例,我必须以FIFO方式处理事件。这些是从机器生成的事件。每台机器每30秒生成一个事件。对于特定的机器,我们需要根据FIFO FASION对事件进行处理。 我们每天需要处理大约2.4亿个事件。对于如此大的规模,我们需要使用Kafka+火花流 从Kafka文档中,我了解到我们可以使用消息的关键字段将消息路由到特定的主题分区。这确保我可以使用机器id作为密钥,并确保来自特定机器的所有消息都进