当前位置: 首页 > 知识库问答 >
问题:

Spark流:广播变量,java.lang.ClassCastException

尉迟明贤
2023-03-14
spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst=true current.jar

当我这样做时,我得到了一个java.lang.ClassCastException:不能将Scala.some实例分配给org.apache.spark.accumulator实例中scala.option类型的字段org.apache.spark.accumulable.name与硬编码ArrayBuffer相同的代码工作得很好,所以我假设它与静态文件资源有关...有人知道我可能做错了什么吗?任何帮助都很感激。


    object BroadcastStreamTest1 {

        def main(args: Array[String]) {
            val sparkConf = new SparkConf()
            val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))

            val content = streamingContext.sparkContext
                .textFile("hdfs:///data/someTextFile.txt")
                .collect()
                .toBuffer[String]

            val broadCastVar = streamingContext.sparkContext.broadcast(content)
            broadCastVar.value.foreach(line => println(line))

            streamingContext.start()
            streamingContext.awaitTermination()
        }
    }


    object BroadcastStreamTest2 {

        def main(args: Array[String]) {
            val sparkConf = new SparkConf()
            val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10))

            val content = new mutable.ArrayBuffer[String]
            (1 to 50).foreach(i => content += "line" + i)

            val broadCastVar = streamingContext.sparkContext.broadcast(content)
            broadCastVar.value.foreach(line => println(line))

            streamingContext.start()
            streamingContext.awaitTermination()
        }
    }

16/04/25 10:09:59 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, n525.hadoop.mxint.net): java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
        at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
        ... 30 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at net.meetrics.dada.streaming.application.BroadcastStreamTest1$.main(BroadcastStreamTest1.scala:14)
        at net.meetrics.dada.streaming.application.BroadcastStreamTest1.main(BroadcastStreamTest1.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
        at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
        ... 30 more 

共有1个答案

王修为
2023-03-14

原因是与我提供的jar文件发生了某种冲突。

不设置

spark.executor.userClassPathFirst=true

它工作,不幸的是我无法找到问题的确切原因。

 类似资料:
  • 知道Spark每个工作节点使用多个执行器,并且每个执行器都在自己的JVM中运行,我想知道Spark /if如何优化广播变量的流量。希望它为每个工作节点进行一次下载,然后将已经序列化的数据发送到该特定节点上的执行器。另一种选择是每次执行器需要时下载广播数据(因此必须在特定节点上多次下载相同的数据)。

  • 一、简介 在 Spark 中,提供了两种类型的共享变量:累加器 (accumulator) 与广播变量 (broadcast variable): 累加器:用来对信息进行聚合,主要用于累计计数等场景; 广播变量:主要用于在节点间高效分发大对象。 二、累加器 这里先看一个具体的场景,对于正常的累计求和,如果在集群模式中使用下面的代码进行计算,会发现执行结果并非预期: var counter = 0

  • 收听电台广播的流媒体直播,还可以录制广播。 作者说:有问题欢迎和我QQ信箱交流:10040142@qq.com [Code4App.com]

  • 问题内容: 我正在使用一个大约100 MB腌制的广播变量,与之近似: 在具有3个c3.2xlarge执行程序和m3.large驱动程序的群集上运行,并使用以下命令启动交互式会话: 在RDD中,如果我坚持对该广播变量的引用,则内存使用量将激增。对于100 MB变量的100个引用,即使将其复制100次,我也希望数据使用总量不超过10 GB(更不用说在3个节点上30 GB)。但是,运行以下测试时,我看到

  • 我有一个大表<code>JavaPairRDD

  • 输出如下: 如果两个数组的维数不相同,则元素到元素的操作是不可能的。 然而,在 NumPy 中仍然可以对形状不相似的数组进行操作,因为它拥有广播功能。 较小的数组会广播到较大数组的大小,以便使它们的形状可兼容。 如果满足以下规则,可以进行广播: 如果输入在每个维度中的大小与输出大小匹配,或其值正好为 1,则在计算中可它。 如果上述规则产生有效结果,并且满足以下条件之一,那么数组被称为可广播的。 数