当前位置: 首页 > 知识库问答 >
问题:

任务不可序列化:火花

艾自强
2023-03-14

我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗?

@Component("loader")
    @Slf4j
public class LoaderSpark implements SparkJob {
    private static final int MAX_VERSIONS = 1;

    private final AppProperties props;


    public LoaderSpark(
            final AppProperties props
    ) {
        this.props = props;
    }


    @Override
    public void run(SparkSession spark, final String... args) throws IOException {

        HBaseUtil hBaseUtil = new HBaseUtil(props);

        byte[][] prefixes = new byte[][]{toBytes("document"),
                toBytes("dataSource"),
                toBytes("hold:")};

        Filter filter = new MultipleColumnPrefixFilter(prefixes);

        Scan scan = new Scan();
        scan.addFamily(toBytes("data"));
        scan.setCaching(100000);
        scan.setMaxVersions(MAX_VERSIONS);
        scan.setFilter(filter);


        JavaRDD<TestMethod> mapFileJavaRDD
                = hBaseUtil.createScanRdd(spark, "TestTable", scan).mapPartitions(tuple -> {

            return StreamUtils.asStream(tuple)
                    .map(this::extractResult)
                    .filter(Objects::nonNull)
                    .iterator();

        });


        Dataset<TestMethod> testDataset = spark.createDataset(mapFileJavaRDD.rdd(), bean(TestMethod.class));
        testDataset.limit(100);

    }

    private TestMethod extractResult(Tuple2<ImmutableBytesWritable, Result> resultTuple) {


        TestMethod.TestMethodBuilder testBuilder = TestMethod.builder();
        Result result;
        result = resultTuple._2();
        CdfParser cdfParser = new CdfParser();

        List<String> holdingId = new ArrayList<>();

        testBuilder.dataSource(Bytes.toString(result.getValue(Bytes.toBytes("data"),
                Bytes.toBytes("dataSource"))));
        testBuilder.document(cdfParser.fromXml(result.getValue(Bytes.toBytes("data"),
                Bytes.toBytes("document"))));

        NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes("data"));
        for (byte[] bQunitifer : familyMap.keySet()) {

            if (Bytes.toString(bQunitifer).contains("hold:")) {

                LOG.info(Bytes.toString(bQunitifer));
                holdingId.add(Bytes.toString(bQunitifer));

            }
        }
        testBuilder.holding(holdingId);

        return testBuilder.build();
    }

}

以下是stacktrace:

  2020-04-29 12:48:59,837 INFO  [Thread-4]o.a.s.d.y.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.IllegalStateException: Failed to execute CommandLineRunner
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
        at org.oclc.googlelinks.spark.SpringSparkJob.main(SpringSparkJob.java:56)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:694)
    Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
        at org.apache.spark.api.java.JavaRDDLike$class.mapPartitions(JavaRDDLike.scala:155)
        at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitions(JavaRDDLike.scala:45)
        at org.oclc.googlelinks.spark.job.LoaderSpark.run(LoaderSpark.java:79)
        at org.oclc.googlelinks.spark.SpringSparkJob.run(SpringSparkJob.java:79)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784)
        ... 8 more

共有2个答案

狄天逸
2023-03-14

要调用静态方法,您不需要序列化类,您需要类加载器可以访问声明类(事实就是如此,因为jar归档可以在驱动程序和工作人员之间共享)。

幸亏https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/

寿翰飞
2023-03-14

尝试为道具添加getter和setter

public void setProps(AppProperties props) {
        this.props = props;
    }

    public AppProperties getProps() {
        return props;
    }

 类似资料:
  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 将现有应用程序从Spark 1.6移动到Spark 2.2*(最终)会导致错误“org.apache.spark.SparkExctive:任务不可序列化”。我过于简化了我的代码,以演示同样的错误。代码查询拼花文件以返回以下数据类型:“org.apache.spark.sql.数据集[org.apache.spark.sql.行]”我应用一个函数来提取字符串和整数,返回字符串。一个固有的问题与Sp

  • 我在我大学的热图项目中,我们必须从txt文件(坐标、高度)中获取一些数据(212Go),然后将其放入HBase以在带有Express的Web客户端上检索它。 我练习使用144Mo文件,这是工作: 但是我现在使用212Go文件,我有一些内存错误,我猜收集方法会收集内存中的所有数据,所以212Go太多了。 所以现在我在尝试这个: 我得到了“org.apache.spark.SparkException

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是

  • 这给出的错误如下,任何帮助将是感激的: