当前位置: 首页 > 知识库问答 >
问题:

示例Java程序中的Spark UDF反序列化错误

李宁
2023-03-14

这个示例直接取自Spark示例代码,所以我不太清楚到底发生了什么。

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class TestSpark {

    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .appName("testspark")
                .master("spark://0.0.0.0:7077")
                 // Yes this file exists, and contains this class
                .config("spark.jars", "out/artifacts/testspark_jar/testspark.jar")
                .getOrCreate();

        // This works
        spark.sql("SELECT 5 + 1").show();

        spark.udf().register("plusOne", (UDF1<Integer, Integer>) x -> x + 1, DataTypes.IntegerType);

        // This fails
        spark.sql("SELECT plusOne(5)").show();

    }

}

我在localhost上运行的Spark独立集群上运行这个。

工人始终失败:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2205)
    at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2168)
    at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1422)
    at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2450)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1175)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:482)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:440)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我运行的是Java 11,使用的是Spark 3.0.1。

我确实发现了一个非常相似的问题,看起来它就是答案:java。lang.ClassCastException在远程服务器上的spark作业中使用lambda表达式

然而,在确保将TestSpark编译到提供给SparkSession的JAR中之后,我仍然会收到相同的错误。

任何帮助都将不胜感激。似乎Java/Scala边界上发生了一些事情,但我对Scala互操作的了解还不够,无法进一步分析。

共有1个答案

范宏大
2023-03-14

我已经链接的问题(在远程服务器上的spark job中使用lambda表达式的java.lang.ClassCastException)的答案是正确的。

因为我没有正确编译JAR,所以它再次被弄糊涂了(我想应该是有清单的,但不是百分之百确定)。

正确编译JAR后(使用mvn包而不是IDE),并在spark中提供JAR。jarsconfig属性,代码按预期工作。

 类似资料:
  • 我正在尝试使用网络库kryonet创建一个多人游戏,我得到了连接和发送字符串的工作,但现在我正在尝试发送对象。我想做的是发送一个数组列表,但它给了我这个错误。我也尝试只发送一个对象,它给了我同样的错误。 我尝试在Metor类中创建一个没有参数的构造函数,但这也不起作用 编辑:所以我得出的结论是,Kryonet在序列化slick2d图像类时有问题,或者与Kryonet和slick2d有不同的冲突。

  • 本文向大家介绍Java 序列化和反序列化实例详解,包括了Java 序列化和反序列化实例详解的使用技巧和注意事项,需要的朋友参考一下 Java 序列化和反序列化实例详解 在分布式应用中,对象只有经过序列化才能在各个分布式组件之间传输,这就涉及到两个方面的技术-发送者将对象序列化,接受者将对象反序列化,下面就是一个很好的例子! 1.实体-Employee 2.SerializeHelper 3.测试类

  • 问题内容: 我尝试过在Java和Android之间实现跨平台序列化。我使用了Serializable,并将我的代码在Android中与台式机Java放在同一软件包中。 来源:java-desktop序列化 资料来源:Android-反序列化 学生是一类,实现了Serializable。在桌面上,我将学生实例序列化为“ thestudent.dat”。我将此文件放在Android设备上的SD卡上,并

  • 主要内容:1 Java序列化和反序列化,2 Java序列化的优点,3 java.io.Serializable接口,4 Java ObjectOutputStream,5 Java ObjectInputStream,6 Java序列化的例子,7 Java反序列化的例子1 Java序列化和反序列化 Java中的序列化是一种将对象状态写入字节流的机制。它主要用于Hibernate,RMI,JPA,EJB和JMS技术。 序列化的反向操作称为反序列化,其中字节流被转换为对象。序列化和反序列化过程与平台

  • 上一小节我们学习了 Java 的输入输出流,有了这些前置知识点,我们就可以学习 Java 的序列化了。本小节将介绍什么是序列化、什么是反序列化、序列化有什么作用,Serializable 接口以及 Externalizable 接口,常用序列化工具介绍等内容。 1. 序列化与反序列化 序列化在计算机科学的数据处理中,是指将数据结构或对象状态转换成可取用格式,以留待后续在相同或另一台计算机环境中,能

  • 本文向大家介绍.net的序列化与反序列化实例,包括了.net的序列化与反序列化实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了.net的序列化与反序列化的实现方法。分享给大家供大家参考。具体方法如下: 1.序列化与反序列化概述 C#中如果需要:将一个结构很复杂的类的对象存储起来,或者通过网路传输到远程的客户端程序中去,这时就需要用到序列化,反序列化(Serialization & De