当前位置: 首页 > 知识库问答 >
问题:

scala spark使用spark shell中的udf函数在dataframe列中进行数组操作

连曜灿
2023-03-14

scala/火花在火花外壳中使用udf函数在数据框列中进行数组操作

< code>df.printSchema

root
|-- x: timestamp (nullable = true)
|-- date_arr: array (nullable = true)
|    |-- element: timestamp (containsNull = true)

样本数据:

|x                      | date_arr                                                              |  
|---------------------- |---------------------------------------------------------------------- |  
| 2009-10-22 19:00:00.0 | [2009-08-22 19:00:00.0, 2009-09-19 19:00:00.0, 2009-10-24 19:00:00.0] |  
| 2010-10-02 19:00:00.0 | [2010-09-25 19:00:00.0, 2010-10-30 19:00:00.0]                        |  

在udf.jar,我有这个函数来获取上限日期在date_arr根据x:

class CeilToDate extends UDF {
  def evaluate(arr: Seq[Timestamp], x: Timestamp): Timestamp = {
    arr.filter(_.before(x)).last
  }
}

添加jar到火花外壳:火花外壳--jarsudf.jar

在火花外壳,我有HiveContext作为val hc=new HiveContext(spc),并创建函数:hc.sql("创建临时函数ceil_to_date'com.abc.udf.CeilToDate'")

当我进行查询时:hc.sql(“选择ceil_to_date(date_arr,x)作为来自df的ceildate”).显示,期望有一个这样的数据帧:

|ceildate              |        
|----------------------|  
|2009-09-19 19:00:00.0 |  
|2010-09-25 19:00:00.0 |  

但是,它会抛出以下错误:

组织 apache.spark.sql.AnalysisException: 没有用于 Hive udf 类 com.abc.udf.CeilToDate 的处理程序,因为: 没有匹配方法用于类 com.abc.udf.CeilToDate 与 (数组,时间戳)。可能的选项: FUNC(结构

共有1个答案

哈朗
2023-03-14

为什么要经历创建udf jar并将jar包含在spack-shell中的所有复杂性。您可以在spack-shell中创建一个并在您的数据框架中使用它。

假设您的数据

scala> df.show(false)
+---------------------+---------------------------------------------------------------------+
|x                    |date_arr                                                             |
+---------------------+---------------------------------------------------------------------+
|2009-10-22 19:00:00.0|[2009-08-22 19:00:00.0, 2009-09-19 19:00:00.0, 2009-10-24 19:00:00.0]|
|2010-10-02 19:00:00.0|[2010-09-25 19:00:00.0, 2010-10-30 19:00:00.0]                       |
+---------------------+---------------------------------------------------------------------+

您可以在spark shell中创建udf函数,但在此之前,您需要三个导入。

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import java.sql.Timestamp
import java.sql.Timestamp

scala> import scala.collection._
import scala.collection._

然后你可以创建一个udf函数

scala> def ceil_to_date = udf((arr: mutable.WrappedArray[Timestamp], x: Timestamp) => arr.filter(_.before(x)).last)
ceil_to_date: org.apache.spark.sql.expressions.UserDefinedFunction

您所需的输出<code>数据帧

scala> df.select(ceil_to_date(col("date_arr"), col("x")).as("ceildate")).show(false)
+---------------------+
|ceildate             |
+---------------------+
|2009-09-19 19:00:00.0|
|2010-09-25 19:00:00.0|
+---------------------+

我希望答案是有帮助的

 类似资料:
  • 我有一个这样的数据帧: 现在,我需要创建新的列“col3”,并且我必须根据col1值在col3中放入新值。生成的数据框如下所示。 比如,如果col1的值为“a”,那么col3中应该有“apple”。如果col1的值为“b”,那么col3中应该有“banana”。如果col1的值为“c”,那么col3中应该有“custard”。 注:col2为正常列,请不要考虑。 我可以得到任何PYSpark UD

  • 现在,我想在一个函数中使用这个,如下所示- 然后使用此函数在我的DataFrame中创建一个新列 总之,我希望我的列“new_col”是一个类型数组,其值为[[x,x,x]] 我得到以下错误。我在这里做错了什么? 原因:java.lang.UnsupportedOperationException:不支持org.apache.spark.sql.Column类型的模式

  • 我试图计算在一个Tibble中源向量和比较向量之间的Jaccard相似度。 jaccard_sim中的所有值都为零。但是,如果我们运行类似这样的东西,我们得到第一个条目的正确的Jaccard相似度为0.2:

  • 我有一个数据框,如: 我需要为每个列应用一些函数,并在这个数据帧中创建具有特殊名称的新列。 所以我需要根据列和(如name)乘以两个额外的列,名称为和由两个。是否可以使用或其他结构来完成此操作?

  • 我有一个spark数据帧,如: 以 如何构造一个在列上运行的UDF,即由火花创建的包装数组,以计算变量平均值?

  • 本文向大家介绍PHP中使用数组指针函数操作数组示例,包括了PHP中使用数组指针函数操作数组示例的使用技巧和注意事项,需要的朋友参考一下 数组的内部指针是数组内部的组织机制,指向一个数组中的某个元素。默认是指向数组中第一个元素通过移动或改变指针的位置,可以访问数组中的任意元素。对于数组指针的控制PHP提供了以下几个内建函数可以利用。 ★current():取得目前指针位置的内容资料。 ★key():