当前位置: 首页 > 知识库问答 >
问题:

定义UDAF时出现Spark-sql错误

解沈义
2023-03-14
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import java.text.SimpleDateFormat
import java.util.Date

class AggregateTS extends UserDefinedAggregateFunction{
    def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)

    def bufferSchema: StructType = StructType(StructField("intermediate", StringType)::Nil)

    def dataType: DataType = StringType

    def deterministic: Boolean = true

    def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = "Init"
    }

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (buffer.getAs[String](0) == "Init"){
            buffer(0) = input.getAs[String](0)
        }
        else{
            // add two string
            buffer(0) = average_ts(input.getAs[String](0), buffer.getAs[String](0))
        }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2:Row):Unit = {
        buffer1(0) = average_ts(buffer1.getAs[String](0), buffer2.getAs[String](0))
    }

    def evaluate(buffer: Row): Any = {
        buffer.getAs[String](0)
    }
}
error: not found: type DataType
       def dataType: DataType = StringType

这是什么意思?

共有1个答案

郭兴平
2023-03-14
import org.apache.spark.sql.types.{DataType}
 类似资料:
  • 为了了解如何在python中导出spark sql dataframe,我参考了以下链接 null

  • 当我创建如上图所示的UDF函数时,我得到任务序列化错误。只有在使用在集群部署模式下运行代码时,才会出现此错误。然而,它在Spark-Shell中运行良好。 我尝试添加,但没有解决问题。

  • 我在一个Dataframe中创建一个列,该列被设置为null(通过None),但是当发送到JDBC write时,我得到“Can't get JDBC type for null”。如有任何帮助,我们将不胜感激。 java.lang.IllegalArgumentException:无法在Qoptimized$class.foreach(indexedseqoptimized.scala:33)在

  • 我有一个包含各种列的表“用户”。其中一列是用户名。我决定通过迁移删除该列。这样做之后,我试图通过注册表单(创建操作)创建一个新用户,但收到用户名的未定义方法错误,我不确定为什么。@user.save 上引发错误。 下面是相关的代码,首先是用户控制器的动作,然后是实际提交的表单。 有什么想法吗?堆栈跟踪,根据要求:

  • 我正在尝试设置apache流。在下面的pom.xml中。当我添加 pom文件出现错误,表示:在这行发现多个注释:-无法传输org.glassfish.hk2:hk2-utils:jar:2.4.0-b34 http://maven.apache.org/xsd/maven-4.0.0.xsd“>4.0.0com.IoT.app.Kafka iot-kafka-producer 1.1.0 IoT

  • 当我试图为我们的一个复杂问题创建一个UDAF时,我决定从一个基本的UDAF开始,该UDAF按原样返回列。既然我是刚开始使用SQL/Scala的,有人能帮我指出我的错误吗。 代码如下: 导入org.apache.spark.sql.expressions.mutableaggregationbuffer导入org.apache.spark.sql.expressions.userdefinedagg