当前位置: 首页 > 知识库问答 >
问题:

引发任务不可序列化异常的Spark数据帧筛选器函数

柴高岑
2023-03-14
val spark = SparkSession.builder()
      .appName("test data frame")
      .master("local[*]")
      .getOrCreate()

    val user_seq = Seq(
      Row(1,"John","London"),
      Row(1,"Martin","New York"),
      Row(1,"Abhishek","New York")
    )
    val user_schema = StructType(
      Array(
        StructField("user_id",IntegerType,true),
        StructField("user_name",StringType,true),
        StructField("user_city",StringType,true)
      ))

    var user_df = spark.createDataFrame(spark.sparkContext.parallelize(user_seq),user_schema)
    var user_rdd = user_df.filter((item)=>{
      return item.getString(2) == "New York"
    })
    user_rdd.count();


objc[48765]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x1059db4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x105a5f4e0). One of the two will be used. Which one is undefined.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/07/18 20:10:09 INFO SparkContext: Running Spark version 2.4.6
20/07/18 20:10:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/07/18 20:10:09 INFO SparkContext: Submitted application: test data frame
20/07/18 20:10:09 INFO SecurityManager: Changing view acls groups to: 
20/07/18 20:10:09 INFO SecurityManager: Changing modify acls groups to: 
20/07/18 20:10:12 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/07/18 20:10:12 INFO ContextCleaner: Cleaned accumulator 0
20/07/18 20:10:13 INFO CodeGenerator: Code generated in 170.789451 ms
20/07/18 20:10:13 INFO CodeGenerator: Code generated in 17.729004 ms
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:872)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:871)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:871)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
    at DataFrameTest$.main(DataFrameTest.scala:65)
    at DataFrameTest.main(DataFrameTest.scala)
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
    - object not serializable (class: java.lang.Object, value: java.lang.Object@cec590c)
    - field (class: DataFrameTest$$anonfun$1, name: nonLocalReturnKey1$1, type: class java.lang.Object)
    - object (class DataFrameTest$$anonfun$1, <function1>)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 5)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 48 more
20/07/18 20:10:13 INFO SparkContext: Invoking stop() from shutdown hook
20/07/18 20:10:13 INFO SparkUI: Stopped Spark web UI at http://192.168.31.239:4040
20/07/18 20:10:13 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/07/18 20:10:13 INFO MemoryStore: MemoryStore cleared
20/07/18 20:10:13 INFO BlockManager: BlockManager stopped
20/07/18 20:10:13 INFO BlockManagerMaster: BlockManagerMaster stopped
20/07/18 20:10:13 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/07/18 20:10:13 INFO SparkContext: Successfully stopped SparkContext
20/07/18 20:10:13 INFO ShutdownHookManager: Shutdown hook called
20/07/18 20:10:13 INFO ShutdownHookManager: Deleting directory /private/var/folders/33/3n6vtfs54mdb7x6882fyqy4mccfmvg/T/spark-3e071448-7ad7-47b8-bf70-68ab74721aa2

Process finished with exit code 1

共有1个答案

钱振
2023-03-14

删除下面一行中的return关键字。

更改下面的代码

var user_rdd = user_df.filter((item)=>{
    return item.getString(2) == "New York"
})

下面一行var user_rdd=user_df.filter(_.getString(2)==“New York”)user_df.filter($“user_city”===“New York”).count

val df = Seq((1,"John","London"),(1,"Martin","New York"),(1,"Abhishek","New York"))
.toDF("user_id","user_name","user_city")

df.filter($"user_city" === "New York").count
 类似资料:
  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 我目前有一个RESTfulWebService跑步服。我最近添加了一个过滤器,它可以执行一些auth操作,并且可以在happy path的情况下工作。但是,当我需要从这个过滤器中抛出一个错误时,它不会将异常序列化为一个漂亮的json字符串,而是抛出一个500,错误如下: 问题是,我不想在应用程序/八位字节流中写入任何内容。我的服务只使用。这在我的实际类中不是问题,我可以在这些类中指定注释。从资源主

  • 我们使用的是基于Spark Streaming接收器的方法,我们刚刚启用了检查指向来解决数据丢失问题。 火花版本是,我们正在接收来自Kafka主题的消息。 我在内部使用了,方法,所以它抛出了不可序列化的异常。 我试图扩展可序列化的类,但仍然是相同的错误。只有当我们启用检查点时,才会发生这种情况。 错误日志: 2017-02-08 22:53:53250错误[驱动程序]流媒体。StreamingCo

  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?