当前位置: 首页 > 知识库问答 >
问题:

带有滞后窗口函数的火花任务不可串行化

连德水
2023-03-14

我注意到,当我在DataFrame上使用窗口函数后,如果我用函数调用map()时,Spark会返回一个“Task not serializable”异常这是我的代码:

val hc:org.apache.spark.sql.hive.HiveContext =
    new org.apache.spark.sql.hive.HiveContext(sc)

import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

def f() : String = "test"
case class P(name: String, surname: String)
val lag_result: org.apache.spark.sql.Column = 
    lag($"name",1).over(Window.partitionBy($"surname"))
val lista: List[P] = List(P("N1","S1"), P("N2","S2"), P("N2","S2"))
val data_frame: org.apache.spark.sql.DataFrame = 
    hc.createDataFrame(sc.parallelize(lista))

df.withColumn("lag_result", lag_result).map(x => f)

// This works
// df.withColumn("lag_result", lag_result).map{ case x =>
//     def f():String = "test";f}.collect

这是堆栈跟踪:

异常:任务不可序列化在org.apache.spark.util.ClosureCleaner$.EnsureClealizable(ClosureCleaner.scala:304)在org.apache.spark.util.ClosureCleaner$.org.apache.spark.util.ClosureCleaner$$Clean(ClosureCleaner.scala:294)在org.apache.spark.util.ClosureCleaner$.Clean(ClosureCleaner.scala:122)在org.apache.spark.sparkcontext.clean(Sparkcontext.scala:2055)在org.apache.spark.scala:2055。rdd.rdd$$anonfun$map$1.apply(Rdd.scala:324)在org.apache.spark.rdd.rdd$$anonfun$map$1.apply(Rdd.scala:323)在...及更多原因:java.io.NotSerializableException:org.apache.spark.sql.列序列化堆栈:

  • 对象不可序列化(类:org.apache.spark.sql.Column,值:'lag(名称,1,null)windowspecdefinition(姓氏,UnspecifiedFrame))
  • 字段(类:$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$IWC$$)等等,名称:lag_result,类型:class org.apache.spark.sql.column)...及更多

共有1个答案

阎安邦
2023-03-14

lag返回不可序列化的O.A.S.SQL.Column。同样的事情也适用于windowspec。在交互模式下,这些对象可以作为map的闭包的一部分包含:

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
df: org.apache.spark.sql.DataFrame = [x: string, y: int]

scala> val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097

scala> val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)

scala> def f(x: Any) = x.toString
f: (x: Any)String

scala> df.select(lag_y).map(f _).first
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)

一个简单的解决方案是将两者都标记为瞬态:

scala> @transient val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470

scala> @transient val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)

scala> df.select(lag_y).map(f _).first
res1: String = [null]     
 类似资料:
  • 我有一门课: 它运行得很好,但抛出了一个例外:在我对RDD的映射做了一个小更改之后: 我以为这两个功能应该是一样的,但似乎不是。为什么它们不同?

  • 我在Scala中查看幻灯片函数中的Spark。

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 我正在查看Spark SQL中的Spark DataFrame的窗口幻灯片函数。 我有一个包含列、和的数据表。

  • 将现有应用程序从Spark 1.6移动到Spark 2.2*(最终)会导致错误“org.apache.spark.SparkExctive:任务不可序列化”。我过于简化了我的代码,以演示同样的错误。代码查询拼花文件以返回以下数据类型:“org.apache.spark.sql.数据集[org.apache.spark.sql.行]”我应用一个函数来提取字符串和整数,返回字符串。一个固有的问题与Sp

  • 我试图在Spark DataFrame上使用RangeBetween对Long类型的列执行窗口函数,但窗口的结果不正确。我做错什么了吗? 第一列是事件的时间戳(字符串,我们在实践中不会使用它),第二列是时间戳对应的unix时间,单位为10E-5秒。 现在,我想计算当前行的窗口中的事件数。例如,在3小时窗口中,我做: 正确返回: 和这个ISSU有关吗?[SPARK-19451][SQL]rangeB