当前位置: 首页 > 知识库问答 >
问题:

如何在Spark中强制进行数据帧评估

江敏学
2023-03-14

有时(例如用于测试和bechmark)我想强制执行在DataFrame上定义的转换。AFAIK调用像count这样的操作并不能确保所有都被实际计算,show可能只计算所有的子集(参见下面的示例)

我的解决方案是使用df将数据帧写入HDFS。写saveAsTable,但这会将我的系统与我不想再保留的表“混在一起”。

那么,触发数据帧评估的最佳方式是什么呢?

编辑:

请注意,最近还讨论了火花开发者列表:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html

我举了一个小例子,说明数据帧上的计数并不能评估所有内容(使用Spark 1.6.3和Spark master进行测试=local[2]):

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception

使用相同的逻辑,这里的示例show不计算所有行:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception

编辑2:对于Eliasah:例外是这样的:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.

共有3个答案

全流觞
2023-03-14

看来df.cache.count是要走的路:

scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i})
myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))

scala> val df = sc.parallelize(1 to 1000).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.withColumn("test",myUDF($"id")).show(10)
[rdd_51_0]
+---+----+
| id|test|
+---+----+
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|
| 10|  10|
+---+----+
only showing top 10 rows

scala> df.withColumn("test",myUDF($"id")).count
res13: Long = 1000

scala> df.withColumn("test",myUDF($"id")).cache.count
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
.
.
.
Caused by: java.lang.RuntimeException

来源

毕嘉
2023-03-14

我想,只需从数据帧中获取一个底层rdd并触发对其的操作,就可以实现您想要的。

df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions
衡子琪
2023-03-14

有点晚了,但最根本的原因是:count对RDD和数据帧的作用不一样。

DataFrame中有一个优化,因为在某些情况下,您不需要加载数据即可实际知道它拥有的元素数(尤其是在您的情况下,不涉及数据洗牌)。因此,调用count时具体化的DataFrame不会加载任何数据,也不会传递到您的异常抛出中。您可以通过定义自己的DefaultSourceRelation轻松地进行实验,并看到在DataFrame上调用count将始终结束在方法buildScan中,不需要,无论您选择了多少列(参见org.apache.spark.sql.sources.interfaces以了解更多)。这实际上是一个非常有效的优化;-)

然而,在RDD中没有这样的优化(这就是为什么人们应该尽可能使用数据帧的原因)。因此,RDD上的计数执行所有沿袭,并返回组成任何分区的所有大小迭代器的总和。

正在调用数据帧。count进入第一个解释,但调用数据帧。rdd。计数进入第二步,因为您确实从数据帧构建了一个RDD。请注意,调用dataframe。缓存()。count(计数)强制数据帧具体化,因为您需要Spark来缓存结果(因此需要加载所有数据并对其进行转换)。但它确实有缓存数据的副作用。。。

 类似资料:
  • 问题内容: 如何在spark数据帧中强制转换结构数组? 让我通过一个例子来说明我要做什么。我们将从创建一个数据框开始,该数据框包含行和嵌套行的数组。我的整数尚未在数据框中强制转换,它们已创建为字符串: 这是创建的数据框的架构: 我想做的是将所有可以为整数的字符串都转换为整数。我尝试执行以下操作,但没有成功: 我有以下异常: 任何人都有正确的查询将所有值转换为INTEGER吗?我将不胜感激。 非常感

  • 我正在使用数据库。假设我有两个Spark Dataframes(我正在使用PySpark): < li>df_source < li>df_target 如果df_source具有以下模式: df_target具有以下架构: 如何有效地创建另一个数据帧,df_final其中可以将df_source中的(null = true/false)属性强制到df_target? 我尝试了以下方法: 通过这种

  • 我开始使用Spark DataFrames,我需要能够枢轴的数据,以创建多个列1列多行。在Scalding中有内置的功能,我相信Python中的熊猫,但是我找不到任何新的Spark Dataframe。 我假设我可以编写某种自定义函数来实现这一点,但我甚至不知道如何开始,特别是因为我是Spark的新手。如果有人知道如何使用内置功能或如何在Scala中编写东西的建议来实现这一点,我们将不胜感激。

  • 问题内容: 我如何强制Spark执行对map的调用,即使它认为由于其惰性求值而无需执行该调用? 我已经尝试过使用map调用了,但是仍然不能解决问题。我的地图方法实际上将结果上传到HDFS。因此,它不是无用的,但Spark认为是。 问题答案: 简短答案: 要强制Spark执行转换,您需要要求一个结果。有时,一个简单的动作就足够了。 TL; DR: 好的,让我们回顾一下 操作。 支持两种类型的操作:

  • 我知道Spark Streaming会生成成批的RDD,但我想积累一个大数据帧,随着每批数据的更新而更新(通过在末尾添加新的数据帧)。 有没有办法像这样访问所有历史流数据? 我看过mapWithState(),但没有看到它专门积累数据帧。

  • 环境:Scala、spark、结构化流媒体、Kafka 我有一个来自Kafka流的DF,具有以下模式 DF: 我希望使用spark并行处理每一行,并使用 我需要从值列中提取值到它自己的数据框中进行处理。我有困难与Dataframe通用行对象... 是否有办法将每个执行器中的单行转换为自己的Dataframe(使用固定模式?)在固定的地点写字?有没有更好的方法来解决我的问题? 编辑澄清: DF im