当前位置: 首页 > 知识库问答 >
问题:

集成SQL和Spark Streaming时不可序列化异常

昌砚
2023-03-14

除了集成SparkSQL和Spark Streaming时不可序列化的异常

我的源代码

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2<String, String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD<Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();
}

JavaSQLContext也在ForeachRDD循环之外声明,但我仍然得到了NonSerializableException

23年12月14日23:49:38错误JobScheduler:运行作业流作业1419378578000 ms.1 org.apache.spark时出错。SparkException:org.apache.spark.util.ClosureCleaner$上的任务不可序列化。org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)org.apache.spark.SparkContext.clean(SparkContext.scala:1435)org.apache.spark.rdd.rdd.map(rdd.scala:271)org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)atorg.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42)在com.basic.spark.NumberCount$2.call(NumberCount.java:79)在com.basic.spark.NumberCount$2.call(NumberCount.java:67)在org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)在org.apache.spark.stream.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.5美元(Javadstream-like.scala:274)在org.apache.spark.stream.dstream.dstream$$anonfun$foreachRDD$1.apply(dstream.scala:529)在org.apache.spark.stream.dstream.dstream$$anonfun$foreachRDD$1.apply(dstream.scala:529)在org.apache.spark.stream.dstream.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)在org.apache.spark.stream.dstream.dstream..ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)在org.apache.spark.stream.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)在scala.util.Try$.apply(Try.scala:161)在org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)在org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor运行(ThreadPoolExecutor.java:615)在java.lang.Thread.run(Thread.java:724)由:java.io引起。NotSerializableException:org.apache.spark.sql.api.java。java.io.ObjectOutputStream的JavaSQLContext.writeObject0(ObjectOutputStream.java:1181)在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)在java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)在java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)在java.io.ObjectOutputStream.WriteObjectObject0(ObjectOutputStream.java:1175)在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)at java.io.ObjectOutputStream.WriteObject 0(ObjectOutputStream.java:1175)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)atjava.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)at java.io.ObjectOutputStream.writeObject 0(ObjectOutputStream.java:1175)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)位于org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)…20更多

如果你有任何建议,我将不胜感激。

共有2个答案

厉文栋
2023-03-14

这是工作代码

package com.basic.spark;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class NumberCount implements Serializable {

    transient SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    transient JavaSparkContext jc = new JavaSparkContext(sparkConf);
    transient JavaStreamingContext jssc_1 = new JavaStreamingContext(jc, new Duration(1000));
    transient JavaSQLContext sqlContext = new JavaSQLContext(jc);
    transient Producer producer = configureKafka();

    public static void main(String args[]) {
        (new NumberCount()).job_1(args);
    }

    public void job_1(String...args) {
        jssc_1.addStreamingListener(new WorkCountMonitor());
        int numThreads = Integer.parseInt(args[3]);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topics = args[2].split(",");
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }

        JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc_1, args[0], args[1], topicMap);
        data.window(new Duration(10000), new Duration(2000));

        JavaDStream<String> streamData = data.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String,String> v1) {
                return v1._2;
            }
        });

        streamData.foreachRDD(new Function<JavaRDD<String>,Void>() {
            public Void call(JavaRDD<String> rdd) {

                if (rdd.count() < 1)
                    return null;

                try {
                    JavaSchemaRDD eventSchema = sqlContext.jsonRDD(rdd);
                    eventSchema.registerTempTable("event");
                    System.out.println("all data");
                    JavaSchemaRDD names = sqlContext.sql("SELECT deviceId, count(*) FROM event group by deviceId");
                    System.out.println("afterwards");

//                    List<Long> males = new ArrayList<Long>();
//
//                    males = names.map(new Function<Row,Long>() {
//                        public Long call(Row row) {
//                            return row.getLong(0);
//                        }
//                    }).collect();
//                    System.out.println("before for");
//                    ArrayList<KeyedMessage<String, String>> data = new ArrayList<KeyedMessage<String, String>>();
//                    for (Long name : males) {
//                        System.out.println("**************"+name);
//                        writeToKafka_1(data, String.valueOf(name));
//                    }
//                    producer.send(data);

                    List<String> deviceDetails = new ArrayList<String>();

                    deviceDetails = names.map(new Function<Row,String>() {
                        public String call(Row row) {
                            return row.getString(0) +":" + row.getLong(1);
                        }
                    }).collect();

                    System.out.println("before for");
                    ArrayList<KeyedMessage<String, String>> data = new ArrayList<KeyedMessage<String, String>>();
                    for (String name : deviceDetails) {
                        System.out.println("**************"+name);
                        writeToKafka_1(data, name);
                    }
                    producer.send(data);

                } catch (Exception e) {
                    System.out.println("#ERROR_1#   #" + rdd);
                    e.printStackTrace();
                }

                return null;
            }
        });
        jssc_1.start();
        jssc_1.awaitTermination();
    }

    public Producer<String, String> configureKafka() {
        Properties props = new Properties();
        props.put("metadata.broker.list", "xx.xx.xx.xx:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("compression.codec", "2");
        props.put("request.required.acks", "0");
        props.put("producer.type", "sync");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        return producer;
    }

    public void writeToKafka_1(ArrayList<KeyedMessage<String,String>> list, String msg) {
        list.add(new KeyedMessage<String,String>("my-replicated-topic-1", "", msg));
    }
}
濮阳默
2023-03-14

您是否在Person pojo类中实现了Serializable接口。您还可以尝试将topicMap声明为最终版本吗

 类似资料:
  • 这是我的源代码,其中Im从服务器端获取一些数据,服务器端不断生成数据流。然后,对于每个RDD,我应用SQL模式,一旦创建了这个表,我就会尝试从这个数据流中选择一些东西。 但是它抛出了这个可序列化的异常,尽管我使用的类确实实现了序列化。

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 时间序列可视化生成器 试验特性 时间序列可视化生成器是一个时间序列数据可视化工具,重点在于允许您使用 Elasticsearch 聚合框架的全部功能。时间序列可视化生成器允许您组合无限数量的聚合和管道聚合,以有意义的方式显示复杂的数据。 特色可视化编辑 时间序列可视化构建包含5种不同的可视化类型。您可以使用界面顶部的选项卡式选取器在每种可视化类型之间切换。 时间序列编辑 直方图可视化,支持具有多个

  • 问题内容: 我正在尝试从Spark中的java.time.format使用DateTimeFormatter,但它似乎不可序列化。这是相关的代码块: 如何避免出现异常?是否有更好的库来解析时间戳?我读过,Joda也不是可序列化的,并且已并入Java 8的时间库中。 问题答案: 您可以通过两种方式避免序列化: 假设其值可以是恒定的,请将格式化程序放在中(使其成为“静态”)。这意味着可以在每个工作程序

  • 如果类B扩展了类A,则类B实现可序列化,而类A有一个不可序列化的公共非静态初始化变量。。。尝试使用FileOutputStream的writeObject()方法写入类“B”对象时,将序列化从a继承的不可序列化成员,以便将其与类B的其余变量一起写入文件,还是将引发NotSerializableException?我试过了,效果不错,但我不知道为什么。。。所以我不确定它是否总是有效,或者我错过了什么

  • 我们使用的是基于Spark Streaming接收器的方法,我们刚刚启用了检查指向来解决数据丢失问题。 火花版本是,我们正在接收来自Kafka主题的消息。 我在内部使用了,方法,所以它抛出了不可序列化的异常。 我试图扩展可序列化的类,但仍然是相同的错误。只有当我们启用检查点时,才会发生这种情况。 错误日志: 2017-02-08 22:53:53250错误[驱动程序]流媒体。StreamingCo