当前位置: 首页 > 知识库问答 >
问题:

如何从udf调用文件系统

吴宝
2023-03-14

目标是为每个DataFrame行添加具有修改时间的列。

鉴于

val data = spark.read.parquet("path").withColumn("input_file_name", input_file_name())

+----+------------------------+
| id |        input_file_name |
+----+------------------------+
|  1 | hdfs://path/part-00001 |
|  2 | hdfs://path/part-00001 |
|  3 | hdfs://path/part-00002 |
+----+------------------------+

预期的

+----+------------------------+
| id |      modification_time |
+----+------------------------+
|  1 | 2000-01-01Z00:00+00:00 |
|  2 | 2000-01-01Z00:00+00:00 |
|  3 | 2000-01-02Z00:00+00:00 |
+----+------------------------+

我写了一个函数来获取修改时间

def getModificationTime(path: String): Long = {
    FileSystem.get(spark.sparkContext.hadoopConfiguration)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

val modificationTime = getModificationTime("hdfs://srsdev/projects/khajiit/data/OfdCheques2/date=2020.02.01/part-00002-04b9e4c8-5916-4bb2-b9ff-757f843a0142.c000.snappy.parquet")

修改时间:长=1580708401253

...但它在查询中不起作用

def input_file_modification_time = udf((path: String) => getModificationTime(path))

data.select(input_file_modification_time($"input_file_name") as "modification_time").show(20, false)

组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段54.0中的任务0失败4次,最近的失败:阶段54.0中的任务0.3丢失(TID 408,srs-hdp-s1.dev.kontur.ru,executor 3):org。阿帕奇。火花SparkException:无法执行用户定义的函数($anonfun$input_file_MODIFY_time$1:(字符串)=

共有2个答案

宓季同
2023-03-14

注意为数据帧的每一行调用getModificationTime将对性能产生影响。

修改代码以一次性获取文件元数据

请检查下面的代码。

scala> val df = spark.read.format("parquet").load("/tmp/par")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> :paste
// Entering paste mode (ctrl-D to finish)

def getModificationTime(path: String): Long = {
    FileSystem.get(spark.sparkContext.hadoopConfiguration)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

// Exiting paste mode, now interpreting.

getModificationTime: (path: String)Long

scala> implicit val files = df.inputFiles.flatMap(name => Map(name -> getModificationTime(name))).toMap
files: scala.collection.immutable.Map[String,Long] = Map(file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_2.snappy.parquet -> 1588080295000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_3.snappy.parquet -> 1588080299000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_4.snappy.parquet -> 1588080302000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000.snappy.parquet -> 1588071322000)

scala> :paste
// Entering paste mode (ctrl-D to finish)

def getTime(fileName:String)(implicit files: Map[String,Long]): Long = {
 files.getOrElse(fileName,0L)
}

// Exiting paste mode, now interpreting.

getTime: (fileName: String)(implicit files: Map[String,Long])Long

scala> val input_file_modification_time = udf(getTime _)
input_file_modification_time: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))

scala> df.withColumn("createdDate",input_file_modification_time(input_file_name)).show
+---+-------------+
| id|  createdDate|
+---+-------------+
|  1|1588080295000|
|  2|1588080295000|
|  3|1588080295000|
|  4|1588080295000|
|  5|1588080295000|
|  6|1588080295000|
|  7|1588080295000|
|  8|1588080295000|
|  9|1588080295000|
| 10|1588080295000|
| 11|1588080295000|
| 12|1588080295000|
| 13|1588080295000|
| 14|1588080295000|
| 15|1588080295000|
| 16|1588080295000|
| 17|1588080295000|
| 18|1588080295000|
| 19|1588080295000|
| 20|1588080295000|
+---+-------------+
only showing top 20 rows


scala>

束飞捷
2023-03-14

问题是,spark在UDF中为空,因为它只存在于驱动程序上。另一个问题是HadoopConfiguration是不可序列化的,因此不能轻易地将其封装到udf中。但是有一个使用org的训练。阿帕奇。火花SerializableWritable

import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration

val conf = new SerializableWritable(spark.sparkContext.hadoopConfiguration)

def getModificationTime(path: String, conf:SerializableWritable[Configuration]): Long = {
    org.apache.hadoop.fs.FileSystem.get(conf.value)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

def input_file_modification_time(conf:SerializableWritable[Configuration]) = udf((path: String) => getModificationTime(path,conf))

data.select(input_file_modification_time(conf)($"input_file_name") as "modification_time").show(20, false)
 类似资料:
  • 我使用expo下载了一张图片(a.jpg),代码如下: 文件成功保存在文件系统中。后来当我试图读取文件时,我得到一个错误,文件无法读取。用于读取文件的代码: 上面的代码返回文件无法读取的错误。fileInfo.exists是true,因为文件存在于文件系统中。 读取文件时出错: 如果我尝试读取一个文本文件(a.json),而不是jpg(a.jpg),那么一切都很好。所以,文件系统。readAsSt

  • 我想知道如何使用JAVA从SparkSQL中的领域特定语言(DSL)函数调用UDF函数。 我有UDF函数(仅举例): 我已经注册到sqlContext了 当我运行下面的查询时,我的UDF被调用,我得到一个结果。 我将使用Spark SQL中特定于域的语言的函数转换此查询,但我不确定如何进行转换。 我发现存在调用 UDF() 函数,其中其参数之一是函数 fnctn 而不是 UDF2。如何使用 UDF

  • 我已经在Ubuntu 14.04上安装了hadoop。每当我将文件从本地文件系统复制到HDFS时,我都会出现以下错误。 我使用这个命令: 我遇到的错误是: 我是Linux环境的新手。我不明白哪个文件不存在。

  • 问题内容: 与此处类似的问题,但在此处没有足够的评论要点。 根据最新的Spark 文档,可以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何与sql 一起使用的示例,但还没有找到有关如何直接在DataFrame上使用a的任何示例。 op所提供的解决方案,在上面链接的问题上使用,根据Spark Java API文档,该解决方案将在Spark 2.0中删除。在那里,它

  • 问题内容: 我阅读了LKD 1中的一些段落,但 我无法理解以下内容: 从用户空间访问系统调用 通常,C库提供对系统调用的支持。用户应用程序可以从标准标头中提取函数原型,并与C库链接以使用您的系统调用(或库例程,后者又使用syscall调用)。但是,如果您只是编写了系统调用,则怀疑glibc是否已支持它! 幸运的是,Linux提供了一组宏,用于包装对系统调用的访问。它设置寄存器内容并发出陷阱指令。这

  • 问题内容: 我正在尝试将内容从Jenkinsfile中分离出来,以制作一个时髦的脚本。但是它无法调用这些脚本:这是代码: file.groovy 看起来Jenkinsfile能够调用file1.groovy但总是给我一个错误: 问题答案: 如果要从外部文件中获取可用的方法,则需要执行以下操作 在您的中,返回对方法的引用 编辑 似乎不是必需的 要么 正如@Olia所提到的 应该管用 这是有关的参考。