我想在UDAF中传递一个数组作为输入模式。
我给出的例子非常简单,它只是对2个向量求和。实际上我的用例更复杂,我需要使用UDAF。
import sc.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._
val df = Seq(
(1, Array(10.2, 12.3, 11.2)),
(1, Array(11.2, 12.6, 10.8)),
(2, Array(12.1, 11.2, 10.1)),
(2, Array(10.1, 16.0, 9.3))
).toDF("siteId", "bidRevenue")
class BidAggregatorBySiteId() extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(Array(StructField("bidRevenue", ArrayType(DoubleType))))
def bufferSchema = StructType(Array(StructField("sumArray", ArrayType(DoubleType))))
def dataType: DataType = ArrayType(DoubleType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, Array(0.0, 0.0, 0.0))
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
val seqBuffer = buffer(0).asInstanceOf[IndexedSeq[Double]]
val seqInput = input(0).asInstanceOf[IndexedSeq[Double]]
buffer(0) = seqBuffer.zip(seqInput).map{ case (x, y) => x + y }
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
val seqBuffer1 = buffer1(0).asInstanceOf[IndexedSeq[Double]]
val seqBuffer2 = buffer2(0).asInstanceOf[IndexedSeq[Double]]
buffer1(0) = seqBuffer1.zip(seqBuffer2).map{ case (x, y) => x + y }
}
def evaluate(buffer: Row) = {
buffer
}
}
val fun = new BidAggregatorBySiteId()
df.select($"siteId", $"bidRevenue" cast(ArrayType(DoubleType)))
.groupBy("siteId").agg(fun($"bidRevenue"))
.show
在“显示”动作之前,所有这些都可以很好地进行转换。但这部剧引发了一个错误:
斯卡拉。MatchError:[WrappedArray(21.4,24.9,22.0)](属于org.apache.spark.sql.execution.aggregate.InputAggregationBuffer类)。阿帕奇。火花sql。催化剂CatalystTypeConverters$ArrayConverter。toCatalystImpl(CatalystTypeConverters.scala:160)
我的数据帧的结构是:
root
|-- siteId: integer (nullable = false)
|-- bidRevenue: array (nullable = true)
| |-- element: double (containsNull = true)
df。dtypes=数组[(String,String)]=数组((“siteId”,“IntegerType”),(“bidRevenue”,“ArrayType(DoubleType,true)”)
坦克为你提供了宝贵的帮助。
def evaluate(buffer: Row): Any
一旦一个组被完全处理以获得最终结果,就会调用上述方法。当您仅初始化和更新缓冲区的第0个索引时
i.e. buffer(0)
因此,您需要在最后返回第0个索引值,因为您的聚合结果存储在0索引处。
def evaluate(buffer: Row) = {
buffer.get(0)
}
上述对评估()方法的修改将导致:
// +------+---------------------------------+
// |siteId|bidaggregatorbysiteid(bidRevenue)|
// +------+---------------------------------+
// | 1| [21.4, 24.9, 22.0]|
// | 2| [22.2, 27.2, 19.4]|
// +------+---------------------------------+
我知道如何在SparkSQL中编写UDF: 我可以做类似的事情来定义聚合函数吗?这是怎么做到的? 对于上下文,我想运行以下SQL查询: 它应该会返回类似于 我希望聚合函数告诉我,在由< code>span和< code>timestamp定义的组中,是否有任何< code>opticalReceivePower的值低于阈值。我需要把我的UDAF写得和我上面粘贴的UDF不同吗?
我很好奇在Spark中把一个RDD传递给一个函数到底做了什么。 假设我们如上定义一个函数。当我们调用函数并传递一个现有的RDD[String]对象作为输入参数时,这个my_function是否将这个RDD作为函数参数进行“复制”?换句话说,是按引用调用还是按值调用?
我正在测试Cassandra中的UDF/UDA特性,看起来不错。但我在使用它时没有什么问题。 1) 在卡桑德拉。yaml,有人提到启用沙箱是为了避免邪恶代码,那么我们是否违反了规则,启用此支持(标志)会产生什么后果? 2)与在客户端读取数据和编写聚合逻辑相比,在Cassandra中使用UDF/UDA有什么优势? 3)此外,除了JAVA之外,是否有一种语言支持可用于编写UDF/UDA的nodejs、
我试图编写< code>udaf来计算< code>percentile值。 我需要编写自定义函数,因为现有的火花函数,和使用舍入不同于我的需要。 我需要使用地板而不是中点舍入。我可以用<code>pyspark<code>编写它吗? 如果不是,如何在scala中实现这一点? 我需要使用以下方法计算:
如何在不使其成为单独变量的情况下传递数组?例如,我知道这是有效的: 但我不想让数组成为变量,因为它只在这里使用。有没有办法做到这一点:
我定义了一个变量: 在