当前位置: 首页 > 知识库问答 >
问题:

在Spark数据帧udf中,像结构(col1, col2)的函数参数类型是什么?

芮宇航
2023-03-14

背景:

我有一个数据帧,它有三列:< code>id,x,y。x,y是双精度的。

  • 首先,我构造(“x”),col(“y”))来获取坐标列。
  • 然后组比(“id”)和 agg(collect_list(“坐标”))

所以现在df只有两列:id,坐标

我认为坐标的数据类型是集合.可变.包裹阵列[(双,双)]。所以我把它传给了udf。但是,数据类型是错误的。我在运行代码时遇到错误。我不知道为什么。结构(col1,col2)的真实数据类型是什么?或者有另一种方法可以轻松获得正确答案?

这是代码

def getMedianPoint = udf((array1: collection.mutable.WrappedArray[(Double,Double)]) => {  
    var l = (array1.length/2)
    var c = array1(l)
    val x = c._1.asInstanceOf[Double]
    val y = c._2.asInstanceOf[Double]
    (x,y)
})

df.withColumn("coordinate",struct(col("x"),col("y")))
  .groupBy(col("id"))
  .agg(collect_list("coordinate").as("coordinate")
  .withColumn("median",getMedianPoint(col("coordinate")))

非常感谢!

共有1个答案

廖夜洛
2023-03-14

我认为坐标的数据类型是collection.mutable.WrappedArray[(双,双)]

是的,你这么说完全正确。你在udf函数中定义的数据类型和你传递的参数也是正确的。但是主要问题是struct列的键的名称。因为您肯定有以下问题

由于数据类型不匹配,无法解析“UDF(坐标):参数 1 需要数组

只需使用< code>alias将结构键重命名为

df.withColumn("coordinate",struct(col("x").as("_1"),col("y").as("_2")))
  .groupBy(col("id"))
  .agg(collect_list("coordinate").as("coordinate"))
    .withColumn("median",getMedianPoint(col("coordinate")))

以便keyname匹配。

这将引起另一个问题

  var c = array1(l)

原因:java.lang.类投射异常: 组织.apache.spark.sql.catalyst.expressions.通用行与化学不能被投射到标拉。元组 2

因此,我建议您将udf函数更改为

import org.apache.spark.sql.functions._

def getMedianPoint = udf((array1: Seq[Row]) => {
  var l = (array1.length/2)
  (array1(l)(0).asInstanceOf[Double], array1(l)(1).asInstanceOf[Double])
})

这样您甚至不需要使用alias。所以完整的解决方案是

import org.apache.spark.sql.functions._

def getMedianPoint = udf((array1: Seq[Row]) => {
  var l = (array1.length/2)
  (array1(l)(0).asInstanceOf[Double], array1(l)(1).asInstanceOf[Double])
})

df.withColumn("coordinate",struct(col("x"),col("y")))
  .groupBy(col("id"))
  .agg(collect_list("coordinate").as("coordinate"))
    .withColumn("median",getMedianPoint(col("coordinate")))
  .show(false)

我希望答案有帮助

 类似资料:
  • 我需要展平一个数据帧,以便将其与Spark(Scala)中的另一个数据帧连接起来。 基本上,我的2个数据帧有以下模式: 数据流1 DF2 老实说,我不知道如何使DF2变平。最后,我需要连接DF.field4 = DF2.field9上的2个数据帧 我用的是2.1.0 我的第一个想法是使用爆炸,但在Spark 2.1.0中已经被否决了,有人能给我一点提示吗?

  • 我最近遇到了这种不同寻常的Java语法。。。下面是一个例子: 注意

  • 问题内容: 这是一段可以毫无问题运行的代码: 但是,如果该函数存在于另一个包中(例如),则该代码将不起作用: 我的问题是: 有没有一种方法可以使用匿名结构作为参数来调用(公共)函数(又名上文)? 以空struct作为参数的函数(也称为上文)可以被调用,即使它存在于另一个包中。这是特例吗? 好吧,我知道我总是可以命名来解决问题,我对此感到很好奇,并且想知道为什么似乎不允许这样做。 问题答案: 您的匿

  • 当你决定看这篇文章,就意味着系统学习 数据结构的开始。本节,我们先来讲什么是 数据结构。   数据结构,直白地理解,就是研究数据的存储方式。 我们知道,数据存储只有一个目的,即为了方便后期对数据的再利用,就如同我们使用数组存储 是为了后期取得它们的加和值,无缘由的数据存储行为是对存储空间的不负责任。 因此,数据在计算机存储空间的存放,决不是胡乱的,这就要求我们选择一种好的方式来存储数据,而这也是数

  • 有人可以帮助我解决这个问题,我与火花数据帧? 当我执行myFloatRDD时。toDF()我收到一个错误: 类型错误:无法推断类型的架构:类型“浮动” 我不明白为什么... 例子: 谢谢

  • 我有一个包含结构数组的嵌套源json文件。结构的数量因行而异,我想使用Spark(scala)从结构的键/值动态创建新的数据框架列,其中键是列名,值是列值。 这里有一个由3个结构组成的数组,但这3个结构需要动态地拆分为3个单独的列(3个的数量可能会有很大的变化),我不知道如何做到这一点。 请注意,数组中的每个数组元素都产生了3个新列。 我认为理想的解决方案与本SO帖子中讨论的类似,但有两个主要区别