当前位置: 首页 > 知识库问答 >
问题:

在Spark SQL for Azure Databricks中创建用户定义(非临时)函数

衡子安
2023-03-14

也许这很愚蠢,我是一名Microsoft SQL/C开发人员,以前从未真正使用过任何其他IDE/编写的JAVA/SCALA。我正在将一些Azure SQL查询迁移到Azure Databricks解决方案。

似乎没有等效的TSQLDATEDIFF_BIG函数(https://docs.microsoft.com/en-us/sql/t-sql/functions/datediff-transact-sql?view=sql-server-2017)

你找到的解决方案是——编写你自己的UDF。

这是我在SCALA笔记本上做的(见下文),对于一个临时函数来说效果很好。(https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html)

这是我找到的最有用的样本https://github.com/johnmuller87/spark-udf.

有相当多的临时函数示例,但我没有发现针对非JAVA/SCALA开发人员的永久函数的示例

我安装了SBT(Windows的最新版本-https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Windows.html)我还安装了Intellj

我运行了为IBAN示例构建的SBT,但在将JAR上传到Clusterd并注册函数后,我无法获得SQL函数。

CREATE FUNCTION ValidateIBAN AS 'com.ing.wbaa.spark.udf.ValidateIBAN' USING JAR 'spark_udf_assembly_0_2_0' --without extension

SELECT ValidateIBAN('NL20INGB0001234567')

错误总是“SQL语句中的错误:AnalysisException:UDF/UDAF/UDTF'com.ing.wbaa.spark.UDF.ValidateIBAN'没有处理程序;第1行位置7”

//import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.udf
import java.time.temporal.ChronoUnit;

// Define function to calculate local time offset
def getTimestampDifference(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

spark.udf.register("DATETIMEDIFF", udf(getTimestampDifference(_:java.lang.String, _:java.sql.Timestamp,_:java.sql.Timestamp),LongType))

实际上,我需要的是——如何将SCALA笔记本转换为SQL函数,以便在Azure Databricks群集5.4版(包括Apache Spark 2.4.3、SCALA 2.11)上的永久SQL视图中使用它

  • 实施什么课程
  • 实现什么方法(在c#中重写)-也有关于HIVE或SPARK的不同文章
  • 如何设置SBT Build或以任何其他方式在Java存档中编译它,以便成功创建和运行SQL函数(仅在SQL中,不在pyhton代码中,也不在scala代码中-在SQL笔记本中)

谢谢你的帮助

共有2个答案

崔宜修
2023-03-14

Spark不提供任何持续时间超过一次Spark会话的永久性功能(Databricks——用Databricks行话创建永久用户定义函数(UDF)或集群生存期)。如果需要长时间运行的火花会话(仅SQL部分),可以考虑将这些UDF添加到蜂箱中,并从SCAK调用它们。否则(想想瞬态集群),每次启动集群时都需要重新添加它。

UDF的代码是非最优的:不处理空/空值/它将引发异常

对于基本(标准)火花UDF,请参阅https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html不需要真正的接口(不像Hive)

关于:SQL功能(仅SQL)/SBT:

如果你真的需要它(对于这个简单的用例)https://github.com/geoHeil/sparkSimpleProjectTemplate.g8可能是你的一个例子。

但是对于这段代码,不需要额外的依赖项。创建一个包含

此外,总是考虑使用火花本机(催化剂优化)功能。SPARK SQl中的DATEDIFF regular DATEDIFF可能已经完成了DATEDIFF big需要完成的许多工作,还可以减去普通时间戳类型的列。如果我对它的理解是正确的,那么只需要将输出格式化为所需的粒度就不存在了(即t-SQL函数将提供现成的输出),可以通过使用不同的函数嵌套它来实现,例如:

  • 或手动分割返回的差异
汝弘深
2023-03-14

您正在引用的数据库中的CREATE函数语句实际上是一个Hive命令,而不是Spark,它期望UDF类是Hive UDF。

这也是出现“没有UDF/UDAF/UDTF处理程序”错误的原因。您链接的示例实现了Spark UDF,而您需要的是实现Hive UDF。

要创建配置单元UDF,需要实现一个扩展类org的类。阿帕奇。hadoop。蜂箱ql.exec。UDF并实现一个名为evaluate的函数。在你的情况下,整个班级应该是这样的:

class GetTimestampDifference extends UDF {

  def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

}

然后需要将其编译为JAR文件,将其复制到DataRicks文件系统的某个位置,并使用与之前相同的命令创建永久函数(假设保留IBAN示例的名称空间):

CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'

SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))

假设您仍在修改刚开始的IBAN示例项目,为了创建jar文件,您必须将以下包依赖项添加到构建中。sbt文件:

"org.apache.spark" %% "spark-hive" % "2.4.3"
 类似资料:
  • 问题内容: 我正在处理大量旧数据(从平面文件db转换),其中字段的格式设置为输入记录的年份的最后2位,然后是4位的增量… 例如,1998年创建的第三条记录将为“ 980003”,而2004年创建的第十一条记录将为“ 040011”。 我无法更改这些值- 它们通过他们的公司存在,已经在州,客户等中注册。我知道将年份和其余年份分隔到单独的列中会很好,但这是不可能的。我什至不能真正做到“内部”,因为每一

  • 问题内容: 有客观的更好的方法在bash脚本中创建临时文件吗? 我通常只要给他们起名就可以使用它们,例如tempfile-123,因为脚本结束后它将被删除。除了覆盖当前文件夹中可能的tempfile-123之外,这样做是否有其他缺点?还是以更谨慎的方式创建临时文件有什么好处? 问题答案: 该手册页解释了它相当好: 传统上,许多shell脚本使用pid作为后缀来命名程序名称,并将其用作临时文件名。这

  • 问题内容: 经过一番搜索和阅读文档后,很明显,您可以在SQL Server中编写用户定义的函数,这些函数被标记为确定性或非确定性的函数,具体取决于主体中使用的内置函数。 RAND()列在非确定性函数下(请参阅msdn article)。那为什么不能在函数中使用它呢? 问题答案: 因为它有副作用。 函数中不允许有副作用的构造。它的副作用是更改某些内部状态,以跟踪上次发布的值。 我认为您可以通过将其包

  • 嗨,我一直试图使一个自定义损失函数在kerasdice_error_coefficient。它有它的实现在张量板和我尝试使用相同的函数在keras与张量流但它不断返回一个NoneType当我使用model.train_on_batch或model.fit在那里,因为它给适当的值时,使用在模型中的指标...能不能请人帮帮我我该怎么办?我尝试过跟随像Keras-FCN这样的库,在那里他使用了自定义损失

  • 我们有一个单节点Cassandra集群(Apache),在AWS上有2个VCPU和大约16 GB的RAM。我们有大约28 GB的数据上传到Cassandra。 现在Cassandra可以很好地使用主键进行选择和分组查询,但是当使用用户定义的函数在非主键上使用聚合函数时,它会给出一个超时。 为了详细说明-我们对3年数据的年份、月份和日期进行了分区。现在,例如,如果两列是-Bill_ID和Bill_A

  • 在我的数据框架中,我有一个复杂的数据结构,我需要处理它来更新另一列。我尝试的方法是使用UDF。但是,如果有更简单的方法可以做到这一点,请随意回答。 所讨论的数据帧结构是: 我试图解决的问题是在有闪烁时更新id列。闪烁发生在id列从id到另一个多次更改时;例如,这将是闪烁。 将和连接起来,如如何构造数据部分所示 告诉我们的是,,需要通过将更改为来更新,