有时(例如用于测试和bechmark)我想强制执行在DataFrame上定义的转换。AFAIK调用像count
这样的操作并不能确保所有列
都被实际计算,show
可能只计算所有行
的子集(参见下面的示例)
我的解决方案是使用df将数据帧写入HDFS。写saveAsTable,但这会将我的系统与我不想再保留的表“混在一起”。
那么,触发数据帧评估的最佳方式是什么呢?
编辑:
请注意,最近还讨论了火花开发者列表:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html
我举了一个小例子,说明数据帧上的计数并不能评估所有内容(使用Spark 1.6.3和Spark master进行测试=local[2]
):
val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})
df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception
使用相同的逻辑,这里的示例show不计算所有行:
val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})
df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception
编辑2:对于Eliasah:例外是这样的:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.
看来df.cache.count
是要走的路:
scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i})
myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))
scala> val df = sc.parallelize(1 to 1000).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]
scala> df.withColumn("test",myUDF($"id")).show(10)
[rdd_51_0]
+---+----+
| id|test|
+---+----+
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
| 10| 10|
+---+----+
only showing top 10 rows
scala> df.withColumn("test",myUDF($"id")).count
res13: Long = 1000
scala> df.withColumn("test",myUDF($"id")).cache.count
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
.
.
.
Caused by: java.lang.RuntimeException
来源
我想,只需从数据帧中获取一个底层rdd并触发对其的操作,就可以实现您想要的。
df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions
有点晚了,但最根本的原因是:count对RDD和数据帧的作用不一样。
在
DataFrame
中有一个优化,因为在某些情况下,您不需要加载数据即可实际知道它拥有的元素数(尤其是在您的情况下,不涉及数据洗牌)。因此,调用count
时具体化的DataFrame
不会加载任何数据,也不会传递到您的异常抛出中。您可以通过定义自己的DefaultSource
和Relation
轻松地进行实验,并看到在DataFrame
上调用count
将始终结束在方法buildScan
中,不需要列
,无论您选择了多少列(参见org.apache.spark.sql.sources.interfaces
以了解更多)。这实际上是一个非常有效的优化;-)
然而,在RDD中没有这样的优化(这就是为什么人们应该尽可能使用数据帧的原因)。因此,RDD上的计数执行所有沿袭,并返回组成任何分区的所有大小迭代器的总和。
正在调用数据帧。count进入第一个解释,但调用数据帧。rdd。计数进入第二步,因为您确实从数据帧构建了一个RDD。请注意,调用dataframe。缓存()。count(计数)强制数据帧具体化,因为您需要Spark来缓存结果(因此需要加载所有数据并对其进行转换)。但它确实有缓存数据的副作用。。。
问题内容: 如何在spark数据帧中强制转换结构数组? 让我通过一个例子来说明我要做什么。我们将从创建一个数据框开始,该数据框包含行和嵌套行的数组。我的整数尚未在数据框中强制转换,它们已创建为字符串: 这是创建的数据框的架构: 我想做的是将所有可以为整数的字符串都转换为整数。我尝试执行以下操作,但没有成功: 我有以下异常: 任何人都有正确的查询将所有值转换为INTEGER吗?我将不胜感激。 非常感
我正在使用数据库。假设我有两个Spark Dataframes(我正在使用PySpark): < li>df_source < li>df_target 如果df_source具有以下模式: df_target具有以下架构: 如何有效地创建另一个数据帧,df_final其中可以将df_source中的(null = true/false)属性强制到df_target? 我尝试了以下方法: 通过这种
我开始使用Spark DataFrames,我需要能够枢轴的数据,以创建多个列1列多行。在Scalding中有内置的功能,我相信Python中的熊猫,但是我找不到任何新的Spark Dataframe。 我假设我可以编写某种自定义函数来实现这一点,但我甚至不知道如何开始,特别是因为我是Spark的新手。如果有人知道如何使用内置功能或如何在Scala中编写东西的建议来实现这一点,我们将不胜感激。
问题内容: 我如何强制Spark执行对map的调用,即使它认为由于其惰性求值而无需执行该调用? 我已经尝试过使用map调用了,但是仍然不能解决问题。我的地图方法实际上将结果上传到HDFS。因此,它不是无用的,但Spark认为是。 问题答案: 简短答案: 要强制Spark执行转换,您需要要求一个结果。有时,一个简单的动作就足够了。 TL; DR: 好的,让我们回顾一下 操作。 支持两种类型的操作:
我知道Spark Streaming会生成成批的RDD,但我想积累一个大数据帧,随着每批数据的更新而更新(通过在末尾添加新的数据帧)。 有没有办法像这样访问所有历史流数据? 我看过mapWithState(),但没有看到它专门积累数据帧。
环境:Scala、spark、结构化流媒体、Kafka 我有一个来自Kafka流的DF,具有以下模式 DF: 我希望使用spark并行处理每一行,并使用 我需要从值列中提取值到它自己的数据框中进行处理。我有困难与Dataframe通用行对象... 是否有办法将每个执行器中的单行转换为自己的Dataframe(使用固定模式?)在固定的地点写字?有没有更好的方法来解决我的问题? 编辑澄清: DF im