当前位置: 首页 > 知识库问答 >
问题:

如何将数组传递给Spark(UDAF)中的用户定义聚合函数

古凌
2023-03-14

我想在UDAF中传递一个数组作为输入模式。

我给出的例子非常简单,它只是对2个向量求和。实际上我的用例更复杂,我需要使用UDAF。

import sc.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._

val df = Seq(
  (1, Array(10.2, 12.3, 11.2)),
  (1, Array(11.2, 12.6, 10.8)),
  (2, Array(12.1, 11.2, 10.1)),
  (2, Array(10.1, 16.0, 9.3)) 
  ).toDF("siteId", "bidRevenue")


class BidAggregatorBySiteId() extends UserDefinedAggregateFunction {

  def inputSchema: StructType = StructType(Array(StructField("bidRevenue", ArrayType(DoubleType))))

  def bufferSchema = StructType(Array(StructField("sumArray", ArrayType(DoubleType))))

  def dataType: DataType = ArrayType(DoubleType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array(0.0, 0.0, 0.0))
      }

  def update(buffer: MutableAggregationBuffer, input: Row) = {
      val seqBuffer = buffer(0).asInstanceOf[IndexedSeq[Double]]
      val seqInput = input(0).asInstanceOf[IndexedSeq[Double]]
      buffer(0) = seqBuffer.zip(seqInput).map{ case (x, y) => x + y }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
     val seqBuffer1 = buffer1(0).asInstanceOf[IndexedSeq[Double]]
     val seqBuffer2 = buffer2(0).asInstanceOf[IndexedSeq[Double]]
     buffer1(0) = seqBuffer1.zip(seqBuffer2).map{ case (x, y) => x + y }
  }

  def evaluate(buffer: Row) = { 
    buffer
  }
}
val fun = new BidAggregatorBySiteId()

df.select($"siteId", $"bidRevenue" cast(ArrayType(DoubleType)))
.groupBy("siteId").agg(fun($"bidRevenue"))
.show

在“显示”动作之前,所有这些都可以很好地进行转换。但这部剧引发了一个错误:

斯卡拉。MatchError:[WrappedArray(21.4,24.9,22.0)](属于org.apache.spark.sql.execution.aggregate.InputAggregationBuffer类)。阿帕奇。火花sql。催化剂CatalystTypeConverters$ArrayConverter。toCatalystImpl(CatalystTypeConverters.scala:160)

我的数据帧的结构是:

root
 |-- siteId: integer (nullable = false)
 |-- bidRevenue: array (nullable = true)
 |    |-- element: double (containsNull = true)

df。dtypes=数组[(String,String)]=数组((“siteId”,“IntegerType”),(“bidRevenue”,“ArrayType(DoubleType,true)”)

坦克为你提供了宝贵的帮助。

共有1个答案

楚良平
2023-03-14
def evaluate(buffer: Row): Any

一旦一个组被完全处理以获得最终结果,就会调用上述方法。当您仅初始化和更新缓冲区的第0个索引时

i.e. buffer(0)  

因此,您需要在最后返回第0个索引值,因为您的聚合结果存储在0索引处。

  def evaluate(buffer: Row) = {
    buffer.get(0)
  }

上述对评估()方法的修改将导致:

// +------+---------------------------------+
// |siteId|bidaggregatorbysiteid(bidRevenue)|
// +------+---------------------------------+
// |     1|               [21.4, 24.9, 22.0]|
// |     2|               [22.2, 27.2, 19.4]|
// +------+---------------------------------+
 类似资料:
  • 我知道如何在SparkSQL中编写UDF: 我可以做类似的事情来定义聚合函数吗?这是怎么做到的? 对于上下文,我想运行以下SQL查询: 它应该会返回类似于 我希望聚合函数告诉我,在由< code>span和< code>timestamp定义的组中,是否有任何< code>opticalReceivePower的值低于阈值。我需要把我的UDAF写得和我上面粘贴的UDF不同吗?

  • 我很好奇在Spark中把一个RDD传递给一个函数到底做了什么。 假设我们如上定义一个函数。当我们调用函数并传递一个现有的RDD[String]对象作为输入参数时,这个my_function是否将这个RDD作为函数参数进行“复制”?换句话说,是按引用调用还是按值调用?

  • 我正在测试Cassandra中的UDF/UDA特性,看起来不错。但我在使用它时没有什么问题。 1) 在卡桑德拉。yaml,有人提到启用沙箱是为了避免邪恶代码,那么我们是否违反了规则,启用此支持(标志)会产生什么后果? 2)与在客户端读取数据和编写聚合逻辑相比,在Cassandra中使用UDF/UDA有什么优势? 3)此外,除了JAVA之外,是否有一种语言支持可用于编写UDF/UDA的nodejs、

  • 我试图编写< code>udaf来计算< code>percentile值。 我需要编写自定义函数,因为现有的火花函数,和使用舍入不同于我的需要。 我需要使用地板而不是中点舍入。我可以用<code>pyspark<code>编写它吗? 如果不是,如何在scala中实现这一点? 我需要使用以下方法计算:

  • 如何在不使其成为单独变量的情况下传递数组?例如,我知道这是有效的: 但我不想让数组成为变量,因为它只在这里使用。有没有办法做到这一点:

  • 我定义了一个变量: 在