当前位置: 首页 > 知识库问答 >
问题:

在数据帧分组用户中聚合时,无法执行用户定义的函数

弘阳德
2023-03-14

我有一个数据帧如下,我试图得到最大(总和)的用户组名称。

+-----+-----------------------------+
|name |nt_set                       |
+-----+-----------------------------+
|Bob  |[av:27.0, bcd:29.0, abc:25.0]|
|Alice|[abc:95.0, bcd:55.0]         |
|Bob  |[abc:95.0, bcd:70.0]         |
|Alice|[abc:125.0, bcd:90.0]        |
+-----+-----------------------------+

下面是我用来为用户获取最大值(总和)的自定义项

val maxfunc = udf((arr: Array[String]) => {
val step1 = arr.map(x => (x.split(":", -1)(0), x.split(":", -1)(1))).groupBy(_._1).mapValues(arr => arr.map(_._2.toInt).sum).maxBy(_._2)
val result = step1._1 + ":" + step1._2
result})

当我运行udf时,它抛出了下面的错误

 val c6 = c5.withColumn("max_nt", maxfunc(col("nt_set"))).show(false)

错误:无法执行用户定义的函数($anonfun$1:(数组)=

因为我需要在更大的数据集中实现这一点,所以如何以更好的方式实现这一点

预期的结果是

expected result:
+-----+-----------------------------+
|name |max_nt                       |
+-----+-----------------------------+
|Bob  |abc:120.0                    |
|Alice|abc:220.0                    |
+-----+-----------------------------+

共有2个答案

桓宜
2023-03-14

据我所知,你的例子是错误的。爱丽丝的bcd字段只有145个,而她的abc字段只有220个。所以abc也应该为她选择。如果我错了,那我误解了你的问题。

不管怎样,你不需要一个udf来做你想做的事。让我们生成您的数据:

val df = sc.parallelize(Seq(
    ("Bob", Array("av:27.0", "bcd:29.0", "abc:25.0")), 
    ("Alice", Array("abc:95.0", "bcd:55.0")), 
    ("Bob", Array("abc:95.0", "bcd:70.0")), 
    ("Alice", Array("abc:125.0", "bcd:90.0"))) )
        .toDF("name", "nt_set")

然后,一种方法是将nt_set分解成一个仅包含一个字符串/值对的列nt。

df.withColumn("nt", explode('nt_set))
  //then we split the string and the value
  .withColumn("nt_string", split('nt, ":")(0))
  .withColumn("nt_value", split('nt, ":")(1).cast("int"))
  //then we sum the values by name and "string"
  .groupBy("name", "nt_string")
  .agg(sum('nt_value) as "nt_value")
  /* then we build a struct with the value first to be able to select
     the nt field with max value while keeping the corresponding string */
  .withColumn("nt", struct('nt_value, 'nt_string))
  .groupBy("name")
  .agg(max('nt) as "nt")
  // And we rebuild the "nt" column.
  .withColumn("max_nt", concat_ws(":", $"nt.nt_string", $"nt.nt_value"))
  .drop("nt").show(false)

+-----+-------+
|name |max_nt |
+-----+-------+
|Bob  |abc:120|
|Alice|abc:220|
+-----+-------+
司马德水
2023-03-14

maxfunc的核心逻辑工作正常,只是它应该处理post groupBy数组列,这是一个嵌套的Seq集合:

val df = Seq(
  ("Bob", Seq("av:27.0", "bcd:29.0", "abc:25.0")),
  ("Alice", Seq("abc:95.0", "bcd:55.0")),
  ("Zack", Seq()),
  ("Bob", Seq("abc:50.0", null)),
  ("Bob", Seq("abc:95.0", "bcd:70.0")),
  ("Alice", Seq("abc:125.0", "bcd:90.0"))
).toDF("name", "nt_set")

import org.apache.spark.sql.functions._

val maxfunc = udf( (ss: Seq[Seq[String]]) => {
  val groupedSeq: Map[String, Double] = ss.flatMap(identity).
    collect{ case x if x != null => (x.split(":")(0), x.split(":")(1)) }.
    groupBy(_._1).mapValues(_.map(_._2.toDouble).sum)

  groupedSeq match {
    case x if x == Map.empty[String, Double] => ("", -999.0)
    case _ => groupedSeq.maxBy(_._2)
  }
} )

df.groupBy("name").agg(collect_list("nt_set").as("arr_nt")).
  withColumn("max_nt", maxfunc($"arr_nt")).
  select($"name", $"max_nt._1".as("max_key"), $"max_nt._2".as("max_val")).
  show
// +-----+-------+-------+
// | name|max_key|max_val|
// +-----+-------+-------+
// | Zack|       | -999.0|
// |  Bob|    abc|  170.0|
// |Alice|    abc|  220.0|
// +-----+-------+-------+
 类似资料:
  • 我在我的cassandra db中实现了用户定义的聚合函数average,如链接https://docs.datastax.com/en/dse/5.1/cql/cql/cql_using/usecreateuda.html所述 创建或替换对空输入调用的函数avgState(state Tuple ,val int)返回元组 语言Java为“if(val!=NULL){state.SetInt(0

  • 我正在测试Cassandra中的UDF/UDA特性,看起来不错。但我在使用它时没有什么问题。 1) 在卡桑德拉。yaml,有人提到启用沙箱是为了避免邪恶代码,那么我们是否违反了规则,启用此支持(标志)会产生什么后果? 2)与在客户端读取数据和编写聚合逻辑相比,在Cassandra中使用UDF/UDA有什么优势? 3)此外,除了JAVA之外,是否有一种语言支持可用于编写UDF/UDA的nodejs、

  • 我知道如何在SparkSQL中编写UDF: 我可以做类似的事情来定义聚合函数吗?这是怎么做到的? 对于上下文,我想运行以下SQL查询: 它应该会返回类似于 我希望聚合函数告诉我,在由< code>span和< code>timestamp定义的组中,是否有任何< code>opticalReceivePower的值低于阈值。我需要把我的UDAF写得和我上面粘贴的UDF不同吗?

  • 我试图编写< code>udaf来计算< code>percentile值。 我需要编写自定义函数,因为现有的火花函数,和使用舍入不同于我的需要。 我需要使用地板而不是中点舍入。我可以用<code>pyspark<code>编写它吗? 如果不是,如何在scala中实现这一点? 我需要使用以下方法计算:

  • 我想在UDAF中传递一个数组作为输入模式。 我给出的例子非常简单,它只是对2个向量求和。实际上我的用例更复杂,我需要使用UDAF。 在“显示”动作之前,所有这些都可以很好地进行转换。但这部剧引发了一个错误: 斯卡拉。MatchError:[WrappedArray(21.4,24.9,22.0)](属于org.apache.spark.sql.execution.aggregate.InputAg

  • 对于Cassandra中的用户定义聚合函数,什么可以作为INITCOND?我只见过具有简单类型(例如元组)的示例。 我为聚合函数中的状态对象提供了以下类型: 当我省略INITCOND时,我得到一个JavaNullPointerException。