当前位置: 首页 > 知识库问答 >
问题:

apache spark中类(任务对象)的kryo序列化反序列化时返回null

鄂曦之
2023-03-14

我正在使用java spark API编写一些测试应用程序。我使用的是一个不扩展可序列化接口的类。因此,为了使应用程序正常工作,我使用kryo序列化器序列化类。但我在调试时观察到的问题是,在反序列化过程中,返回的类对象变为null,并反过来抛出一个null指针异常。这似乎是关闭的问题,事情是错误的,但不确定。因为我是新的这种系列化,我不知道从哪里开始挖掘。

下面是我正在测试的代码:

package org.apache.spark.examples;


import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;




/**
 * Spark application to test the Serialization issue in spark
 */
public class Test {

    static PrintWriter outputFileWriter;
    static FileWriter file;
    static JavaSparkContext ssc;

    public static void main(String[] args) {


        String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt";

        String master = "local";
        String jobName = "TestSerialization";
        String sparkHome = "/home/test/Spark_Installation/spark-0.7.0";
        String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar";


        SparkConf conf = new SparkConf();
        conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
        // create the Spark context
        if(master.equals("local")){
            ssc = new JavaSparkContext("local", jobName,conf);
            //ssc = new JavaSparkContext("local", jobName);
        } else {
            ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar);
        }
        JavaRDD<String> testData = ssc.textFile(inputFile).cache();
        final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi ");
        @SuppressWarnings({ "serial", "unchecked"})
        JavaRDD<String> classificationResults = testData.map(
                new Function<String, String>() {
                    @Override
                    public String call(String inputRecord) throws Exception {                   
                        if(!inputRecord.isEmpty()) {
                            //String[] pointDimensions = inputRecord.split(",");
                            String result = "";

                            try {
                                FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100));
                                PrintWriter outputFile = new PrintWriter(file); 
                                InetAddress ip;
                                ip = InetAddress.getLocalHost();
                                outputFile.println("IP of the server: " + ip);

                                result = notSerializableTestObject.testMethod(inputRecord);
                                outputFile.println("Result: " + result);

                                outputFile.flush();
                                outputFile.close();
                                file.close();

                            } catch (UnknownHostException e) {
                                e.printStackTrace();
                            }
                            catch (IOException e1) {
                                e1.printStackTrace();
                            } 

                            return result;
                        } else {
                            System.out.println("End of elements in the stream.");
                            String result = "End of elements in the input data";
                            return result;
                        }
                    }

                }).cache(); 

        long processedRecords = classificationResults.count();

        ssc.stop();
        System.out.println("sssssssssss"+processedRecords);
    }
}
package org.apache.spark.examples;

import org.apache.spark.serializer.KryoRegistrator;

import com.esotericsoftware.kryo.Kryo;

public class MyRegistrator implements KryoRegistrator {
    public void registerClasses(Kryo kryo) {
        kryo.register(NotSerializableJavaClass.class);
    }
}

下面是我正在序列化的类:

package org.apache.spark.examples;

public class NotSerializableJavaClass {
    public String testVariable;

    public NotSerializableJavaClass(String testVariable) {
        super();
        this.testVariable = testVariable;
    }

    public String testMethod(String vartoAppend){
        return this.testVariable + vartoAppend;
    }
}

共有1个答案

单于皓轩
2023-03-14

这是因为spark.closure.serializer只支持Java序列化器。有关spark.closure.serializer的信息,请参阅http://spark.apache.org/docs/latest/configuration.html

 类似资料:
  • 我正在尝试使用kryo序列化和反序列化到二进制。我想我已经完成了序列化,但似乎无法反序列化。下面是我正在处理的代码,但最终我想存储一个字节[],然后再次读取它。文档只显示了如何使用文件。

  • 错误: java.lang.ClassNotFoundException:testprocedure.tp$3在java.net.URLClassLoader$1上运行(未知源)在java.net.URLClassLoader上运行(未知源)在java.security.accessController.doprivileged(本机方法)在java.net.URLClassLoader.find

  • 本文向大家介绍java对象的序列化和反序列化,包括了java对象的序列化和反序列化的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了java对象的序列化和反序列化,供大家参考,具体内容如下 1. 什么是序列化        将对象转换为字节流保存起来,比如保存到文件里,并在以后还原这个对象,这种机制叫做对象序列化。(补充一句:把对象保存到永久存储设备上称为持久化) 2. 怎么实现序列化

  • 上下文:我们使用Activiti作为流程引擎,使用Activiti-Rest作为应用程序的接口。由于这个问题与返回由Java序列化的对象的REST服务有关,所以我没有将其添加到标题中。 场景:我们有一个JBoss Wildfly实例,它包含一个EAR和一个包含类“ProcessContext”的模块(为了参考,我们将其称为X)。Activiti在这个EAR中运行,ServiceTasks(从进程中

  • 我已经开始将一个项目从使用Java标准日期迁移到Joda DateTime。 我的项目使用XML序列化将对象保存到XML文件中。在这个特殊的例子中,我有一个Item类,它有一个DateTime属性。 在某个时候,我正在初始化对象,包括像这样的DateTime属性: 我使用XMLEncoder使用辅助类序列化项目: 显然,日期时间被保存在xml中。。。但毫无价值: 显然,它没有保存任何东西,但不,它