当前位置: 首页 > 知识库问答 >
问题:

集成Spark SQL和Spark Streaming时出现不可序列化异常

公良文彬
2023-03-14

这是我的源代码,其中Im从服务器端获取一些数据,服务器端不断生成数据流。然后,对于每个RDD,我应用SQL模式,一旦创建了这个表,我就会尝试从这个数据流中选择一些东西。

    List<String> males = new ArrayList<String>();
    JavaDStream<String> data = streamingContext.socketTextStream("localhost", (port));
    data.print();
    System.out.println("Socket connection established to read data from Subscriber Server");
    JavaDStream<SubscriberData> streamData = data
            .map(new Function<String, SubscriberData>() {
                public SubscriberData call(String record) {
                    String[] stringArray = record.split(",");
                    SubscriberData subscriberData = new SubscriberData();
                    subscriberData.setMsisdn(stringArray[0]);
                    subscriberData.setSubscriptionType(stringArray[1]);

                    subscriberData.setName(stringArray[2]);
                    subscriberData.setGender(stringArray[3]);
                    subscriberData.setProfession(stringArray[4]);
                    subscriberData.setMaritalStatus(stringArray[5]);


                    return subscriberData;

                }

            });

    streamData.foreachRDD(new Function<JavaRDD<SubscriberData>,Void>(){
        public Void call(JavaRDD<SubscriberData> rdd){

    JavaSQLContext sqlContext = new JavaSQLContext(sc);
    JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,SubscriberData.class);

    subscriberSchema.registerAsTable("SUBSCRIBER_DIMENSION");
    System.out.println("all data");
    JavaSchemaRDD names = sqlContext.sql("SELECT msisdn FROM SUBSCRIBER_DIMENSION WHERE GENDER='Male'");
    System.out.println("afterwards");

    List<String> males = new ArrayList<String>();

     males = names.map(new Function<Row, String>() {
        public String call(Row row) {
            return row.getString(0);
        }
    }).collect();
    System.out.println("before for");
    for (String name : males) {
        System.out.println(name);
            }
    return null;
    }
    });
streamingContext.start();

但是它抛出了这个可序列化的异常,尽管我使用的类确实实现了序列化。

    14/11/06 12:55:20 ERROR scheduler.JobScheduler: Error running job streaming job 1415258720000 ms.1
 org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:75)
at org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42)
at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:206)
at com.hp.tbda.rta.SubscriberClient$2.call(SubscriberClient.java:1)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more

共有1个答案

乐刚毅
2023-03-14

SparkContext不可序列化,因为它只能在驱动程序上使用,不应包含在任何闭包中。我担心目前对Spark流媒体上SQL的支持仅处于研究水平。有关详细信息,请参阅Spark峰会的演示。

要创建男性订阅者ID的预期RDD,可以使用映射和过滤器:

maleSubscribers = subscribers.filter(subsc => subcs.getGender == "Male")
                             .map(subsc => subsc.getMsisdn)
 类似资料:
  • 除了集成SparkSQL和Spark Streaming时不可序列化的异常 我的源代码 JavaSQLContext也在ForeachRDD循环之外声明,但我仍然得到了NonSerializableException 23年12月14日23:49:38错误JobScheduler:运行作业流作业1419378578000 ms.1 org.apache.spark时出错。SparkExceptio

  • 在反序列化包含JSON序列化LocalDate对象的JSON字符串时,我看到了一个异常(有关JSON片段,请参阅本问题末尾)。 这就是我反序列化的方式: 我看到以下异常消息: 下面是我试图反序列化的JSON片段:

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 我试图在Windows7上使用Tomcat7添加ApacheSolr4.6。 另外,我将solr,solrj和log4j添加到tomcat lib文件夹中。我复制了solr web porlet到增殖和尝试启动服务器,但我得到异常。 谁能告诉我这个错误是怎么回事吗。

  • 我目前正在将一些代码从Jackson1.x迁移到Jackson2.5json映射器,遇到了一个1.x中没有的问题。 这是设置(参见下面的代码): 接口IPET 类Dog实现IPET IPET使用@jsonTypeInfo和@jsonSubtypes进行注释 类Human具有一个类型为IPet的属性,该属性使用@JSONSerialize(using=CustompetSerializer.clas

  • 问题内容: 我有一个包含三个字段的单一对象:两个字符串和一个 我想做的是将这些对象中的一个保存到文件中,但是我不断收到 我用来存储文件的代码: 未初始化drawable时,一切正常。在此先感谢您的帮助。 问题答案: java.io.NotSerializableException: android.graphics.drawable.BitmapDrawable 该消息看起来非常清晰- 字段中的特