当前位置: 首页 > 知识库问答 >
问题:

Scala-在提交jar时(而不是在shell中)触发NullPointerError

龙星渊
2023-03-14

我的spark作业引发了一个无法跟踪的空指针异常。当我打印潜在的null变量时,它们都填充在每个Worker上。我的数据不包含空值,因为同一作业在spark Shell中工作。下面是作业的execute函数,后面是错误消息。

没有在函数中定义的所有helper方法都是在spark作业对象的主体中定义的,所以我相信闭包不是问题所在。

override def execute(sc:SparkContext) = {
  def construct_query(targetTypes:List[String]) = Map("query" ->
    Map("nested" ->
      Map("path"->"annotations.entities.items",
        "query"-> Map("terms"->
          Map("annotations.entities.items.type"-> targetTypes)))))

  val sourceConfig = HashMap(
    "es.nodes" -> params.targetClientHost
  )

  // Base elastic search RDD returning articles which match the above query on entity types
  val rdd = EsSpark.esJsonRDD(sc,
    params.targetIndex,
    toJson(construct_query(params.entityTypes)),
    sourceConfig
  ).sample(false,params.sampleRate)

  // Mapping ES json into news article html" target="_blank">object, then extracting the entities list of
  // well defined annotations
  val objectsRDD = rdd.map(tuple => {
    val maybeArticle =
      try {
        Some(JavaJsonUtils.fromJson(tuple._2, classOf[SearchableNewsArticle]))
      }catch {
        case e: Exception => None
      }
    (tuple._1,maybeArticle)
  }
  ).filter(tuple => {tuple._2.isDefined && tuple._2.get.annotations.isDefined &&
    tuple._2.get.annotations.get.entities.isDefined}).map(tuple => (tuple._1, tuple._2.get.annotations.get.entities.get))

  // flat map the RDD of entities lists into a list of (entity text, (entity type, 1)) tuples
  (line 79) val entityDataMap: RDD[(String, (String, Int))] = objectsRDD.flatMap(tuple => tuple._2.items.collect({
    case item if (item.`type`.isDefined) && (item.text.isDefined) &&
   (line 81)(params.entityTypes.contains(item.`type`.get))  => (cleanUpText(item.text.get), (item.`type`.get, 1))
  }))

  // bucketize the tuples RDD into entity text, List(entity_type, entity_count) to make count aggregation and file writeouts
 // easier to follow
 val finalResults: Array[(String, (String, Int))] = entityDataMap.reduceByKey((x, y) => (x._1, x._2+y._2)).collect()

  val entityTypeMapping = Map(
    "HealthCondition" -> "HEALTH_CONDITION",
    "Drug" -> "DRUG",
    "FieldTerminology" -> "FIELD_TERMINOLOGY"
  )

  for (finalTuple <- finalResults) {
    val entityText = finalTuple._1
    val entityType = finalTuple._2._1
    if(entityTypeMapping.contains(entityType))
    {
                if(!Files.exists(Paths.get(entityTypeMapping.get(entityType).get+".txt"))){
        val myFile = new java.io.FileOutputStream(new   File(entityTypeMapping.get(entityType).get+".txt"),false)
        printToFile(myFile) {p => p.println(entityTypeMapping.get(entityType))}
      }
    }
    val myFile = new java.io.FileOutputStream(new   File(entityTypeMapping.get(entityType).get+".txt"),true)
    printToFile(myFile) {p => p.println(entityText)}
  }

}

以及下面的错误消息:

java.lang.NullPointerException在com.quid.gazetteers.gazetteergenerator$$anonfun$4$$anonfun$apply$1.isdefinedat(gazetteergenerator.scala:81)在com.quid.gazetteers.gazetteergenerator$$anonfun$4$$anonfun$apply$1.isdefinedat(gazetteergenerator.scala:79)在scala.collection.traversablelike$$anonfun$collection$1.apply(traversablelike.scala.immutable.list.foreach(list.scala:318)在scala.collection.traversablelike$class.collection VersableLike.Scala:278)在Scala.Collection.AbstractTraversable.Collect(Traversable.Scala:105)在com.quid.gazetteers.gazetteergenerator.Scala:79)在com.quid.gazetteers.gazetteergenerator$$anonfun$4。Apply(Gazetteergenerator.Scala:79)在scala.Collection.Iterator$$anon$13。HasNext(iterator.Scala:371)在org.apache.spark.util.Collection.ExternalSorter.InserTall(ExternalSorter.Scala:189)在org.apache.spark.shu。在org.apache.spark.scheduler.shufflem上写(sortshufflewriter.scala:64)aptask.runtask(shufflemaptask.scala:73)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41)在org.apache.spark.scheduler.task.run(task.scala:89)在org.apache.spark.executor.executor$taskrunner.run(executor.scala:214)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.617)在java.lang.thread.run(thread.java:745)

共有1个答案

桂浩言
2023-03-14

这个问题已经解决了。params属性未序列化,且对spark Workers可用。解决方案是在需要params属性的区域范围内形成一个spark广播变量。

 类似资料:
  • 用户可以从datalist或can类型中选择一个项目,输入它自己的值。我通过JSON调用PHP脚本连接到数据库,以填充表单其余部分的其他信息。我希望当用户在列表输入中键入名称时(因此当内容模糊时),或者当用户单击DataList中的某个选项时,这会触发。 使用当输入失去焦点时会触发该函数,但当从数据列表中选择一项时,它也会等待“直到输入失去焦点,我希望事件立即触发” 使用单击datalist中的一

  • 使用Travis-CI,是否可以在不向GitHub推送新的提交的情况下触发重建? 用例:一个构建由于外部性而失败。出处其实是正确的。它将构建OK和通过,如果简单地重新运行。 例如,由于包服务器关闭而失败,但服务器已重新备份。但是,在推送一个新的提交之前,构建状态“卡”在“失败”。 除了推送一个“虚拟”提交之外,还有什么方法可以推动Travis-CI进行另一个构建吗?

  • 我对Android系统比较陌生,所以我在这里很困惑... NotificationReceiver.java manifest.xml

  • 管道A:仅由其自身回购之外的多个其他管道触发,但在同一项目中。作为被触发的结果,它对自己的回购进行更改,因此触发管道B。 pipleline B:仅由对其自身回购的更改触发,当被触发时,它将继续执行所需的任何操作 将语法流水线化 当前行为 null null 安装:https://docs.microsoft.com/en-us/azure/devops/cli/?view=azure-devop

  • 问题内容: 我正在尝试在应用程序中使用AngularJS,并在某种程度上取得了成功。 我能够获取数据并将其显示给用户。我有一个按钮,我想通过该按钮发布DELETE请求。下面是我的代码。 这是获取信息并将其显示给用户的功能。 数据已获取并正确显示。 但是,单击删除按钮后,不会触发其中的代码。在GoogleChrome浏览器的开发人员工具中,我可以看到为代码生成了有效值,但未触发代码。从我尝试去掉花括

  • 问题内容: 请原谅我; 我对Struts有点陌生。我遇到一个问题,即页面加载而不是我实际提交表单时发生了验证。我整日都在论坛上搜寻和搜寻,没有任何运气。我显然做错了一些事情,应该很容易确定,但是我还没有发现问题所在。 这是我的struts.xml的片段: 如您所见,如果验证成功,我有一个动作要提交给results.jsp。否则,我希望它再次显示我的index.jsp。据我所知,这些页面可以正确导航