当前位置: 首页 > 面试题库 >

运行Apache Spark作业时任务无法序列化异常

松旭
2023-03-14
问题内容

编写以下Java程序来试验apache spark。

该程序尝试从相应的文件中读取正负列表,将其与主文件进行比较并相应地过滤结果。

import java.io.Serializable;
import java.io.FileNotFoundException;
import java.io.File;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.List;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SimpleApp implements Serializable{
  public static void main(String[] args) {
    String logFile = "/tmp/master.txt"; // Should be some file on your system
    String positive = "/tmp/positive.txt"; // Should be some file on your system
    String negative = "/tmp/negative.txt"; // Should be some file on your system

    JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"});

    JavaRDD<String> positiveComments = sc.textFile(logFile).cache();

    List<String> positiveList = GetSentiments(positive);
    List<String> negativeList= GetSentiments(negative);

    final Iterator<String> iterator = positiveList.iterator();
    int i = 0;
    while (iterator.hasNext())
    {
      JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(iterator.next());
        }
      });

     numAs.saveAsTextFile("/tmp/output/"+ i);
     i++;
     }

  }

public static List<String> GetSentiments(String fileName) {
  List<String> input = new ArrayList<String>();
try
{
  Scanner sc = new Scanner(new File(fileName));

  while (sc.hasNextLine()) {
      input.add(sc.nextLine());
  }
}
catch (FileNotFoundException e){
    // do stuff here..
}
  return input;
}

}

执行spark作业时引发以下错误,

 Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
    at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
    at SimpleApp.main(SimpleApp.java:37)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 12 more

任何指针?


问题答案:

创建匿名类时,编译器会做一些事情:

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(iterator.next());
        }
      });

它将被重写为:

JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        private Iterator<...> $iterator;
        public Boolean call(String s)
        {
          return s.contains($iterator.next());
        }
      });

这就是为什么您可以使用的NotSerializableException原因,因为迭代器不可序列化。

为了避免这种情况,只需在提取出next的结果之前:

String value = iterator.next();
JavaRDD<String> numAs = positiveComments.filter(new Function<String, Boolean>()
      {
        public Boolean call(String s)
        {
          return s.contains(value);
        }
      });


 类似资料:
  • 问题内容: 所有守护程序都在运行,jps显示: 但是示例继续失败,并带有以下异常: 因为它说问题出在配置中,所以我在这里发布配置文件。目的是创建一个单节点群集。 yarn-site.xml core-site.xml hdfs-site.xml mapred-site.xml 请告诉我们缺少了什么或我在做什么错。 问题答案: 您使用大写字母,这可能是为什么它无法解决的原因。尝试使用官方文档中建议的

  • 我有一个这样的转变: Pageview是一种:Pageview。JAVA 我在Spark上注册了这样的课程: 异常线程"main"org.apache.spark.SparkExctive:任务不能在org.apache.spark.util.ClosureCleaner$. ensureSerializable(ClosureCleaner.scala:166)在org.apache.spark

  • 无法解决以下由)触发的序列化问题。我认为可以解决序列化问题,但事实并非如此。那么,如何使用? 我假设变量和是不可序列化的,但是我如何正确地序列化它们,以便代码能够在集群上工作,而不仅仅是在本地工作呢? 上面显示的代码抛出错误:

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 问题内容: 我在一个比较大的Web应用程序上工作,后端主要使用PHP。代码中有几个地方需要完成一些任务,但是我不想让用户等待结果。例如,在创建新帐户时,我需要向他们发送欢迎电子邮件。但是,当他们按下“完成注册”按钮时,我不想让他们等到实际发送电子邮件之后,我只想开始该过程,并立即向用户返回一条消息。 到目前为止,在某些地方,我一直在使用exec()感觉像是被黑客入侵。基本上是这样的: 这似乎可行,

  • 我正在尝试自动登录到2008 R2服务器上的一组RDP客户端,当从桌面双击批处理文件时,下面的批处理工作正常,但是当将.bat文件设置为作为任务调度程序中的任务运行时,什么都没有发生 计划任务被设置为在管理员帐户下运行(只有一个可用),并且(开始于)可选字段也被设置为“C:\Users\Administrator\Desktop”。