当前位置: 首页 > 知识库问答 >
问题:

在Spark中创建数据集期间发生Scala反射异常

庄萧迟
2023-03-14

我想在Spark Jobserver上运行Spark Job。在执行过程中,我遇到了一个例外:

堆栈:

JAVARuntimeException:scala。ScalarReflectionException:类com。一些实例仪器数据JavaMirror和org中的SQLMapping。阿帕奇。火花util。MutableURLClassLoader@55b699ef属于组织类。阿帕奇。火花util。类路径为[file:/app/spark job server.jar]且父级为sun的MutableURLClassLoader。杂项。发射器$AppClassLoader@2e817b38属于太阳类。杂项。找不到类路径为[…/classpath jars/]的启动器$AppClassLoader。

在斯卡拉。反映内部的镜像$RootsBase。scala的staticClass(Mirrors.scala:123)。反映内部的镜像$RootsBase。staticClass(Mirrors.scala:22)在com上发布。一些实例仪器DataRetriever$$anonfun$combineMappings$1$$TypeCreator 15$1。在scala应用(DataRetriever.scala:136)。反映应用程序编程接口。TypeTags$WeakTypeTagImpl。在scala的tpe$lzycompute(TypeTags.scala:232)。反映应用程序编程接口。TypeTags$WeakTypeTagImpl。tpe(TypeTags.scala:232)网址:org。阿帕奇。火花sql。催化剂编码器。ExpressionEncoder美元。在org上申请(ExpressionEncoder.scala:49)。阿帕奇。火花sql。编码器$。产品(Encoders.scala:275)位于org。阿帕奇。火花sql。LowPrioritySql包含其$class。newProductEncoder(SQLImplicits.scala:233)位于org。阿帕奇。火花sql。是的。com上的newProductEncoder(SQLImplicits.scala:33)。一些实例仪器DataRetriever$$anonfun$combineMappings$1。在com上申请(DataRetriever.scala:136)。一些实例仪器DataRetriever$$anonfun$combineMappings$1。在scala应用(DataRetriever.scala:135)。util。成功$$anonfun$map$1。在scala应用(试试scala:237)。util。试试美元。在scala应用(试试scala:192)。util。成功在scala上绘制地图(试试scala:237)。同时发生的未来$$anonfun$map$1。在scala申请(Future.scala:237)。同时发生的未来$$anonfun$map$1。在scala申请(Future.scala:237)。同时发生的impl。CallbackRunnable。在scala跑步(Promise.scala:32)。同时发生的impl。ExecutionContextImpl$AdaptedForkJoinTask。scala的执行官(ExecutionContextImpl.scala:121)。同时发生的分叉连接。这是一项艰巨的任务。scala的doExec(ForkJoinTask.java:260)。同时发生的分叉连接。ForkJoinPool$WorkQueue。scala上的runTask(ForkJoinPool.java:1339)。同时发生的分叉连接。福克斯:泳池。scala的runWorker(ForkJoinPool.java:1979)。同时发生的分叉连接。ForkJoinWorkerThread。运行(ForkJoinWorkerThread.java:107)

DataRetriever中,我将简单的case类转换为DataSet。

案例类别定义:

case class SQLMapping(id: String,
                      it: InstrumentPrivateKey,
                      cc: Option[String],
                      ri: Option[SourceInstrumentId],
                      p: Option[SourceInstrumentId],
                      m: Option[SourceInstrumentId])

case class SourceInstrumentId(instrumentId: Long,
                              providerId: String)

case class InstrumentPrivateKey(instrumentId: Long,
                                providerId: String,
                                clientId: String)

导致问题的代码:

import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
 }
}

作业有时会工作,但如果我重新运行作业,它将抛出异常。

更新2018年3月28日我忘了提到一个细节,这很重要。数据集是在Future内部构建的。

共有1个答案

司马渝
2023-03-14

在将来调用toDS(),导致ScalaReflse异常。

我决定在未来之外构建数据集。地图

您可以验证数据集不能在将来构建。使用此示例作业映射

package com.example.sparkapplications

import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobValidation

object FutureJob extends SparkJob{
  override def runJob(sc: SparkContext,
                      jobConfig: Config): Any = {
    val session = SparkSession.builder().config(sc.getConf).getOrCreate()
    import session.implicits._
    val f = Future{
      val seq = Seq(
        Dummy("1", 1),
        Dummy("2", 2),
        Dummy("3", 3),
        Dummy("4", 4),
        Dummy("5", 5)
      )

      val ds = seq.toDS

      ds.collect()
    }

    Await.result(f, 10 seconds)
  }

  case class Dummy(id: String, value: Long)
  override def validate(sc: SparkContext,
                        config: Config): SparkJobValidation = SparkJobValid
}

稍后,我将提供信息,如果问题仍然使用火花2.3.0,当你通过火花提交直接传递jar。

 类似资料:
  • 我通过连接单独的月、日、年和时间列创建了日期列,但是月和日列的输入数据形式为1,而不是01表示月和日。这是我返回空列的原因吗?还是有其他原因?如果这就是原因,那么如何将日和月列从1改为01、2改为02、…? 这是我第一次使用时间戳,而且我是Scala新手,所以我非常感谢您的帮助。

  • 我正在尝试使用JDBC处理Presto上的查询,并将结果集传递回Spark,以便在其上创建临时表。我的结果集在列表中 我从kafka producer获得了json Msg形式的查询。因此,我们在spark中创建了kafka consumer,以获取信息并进行进一步处理。 以下是我的主要功能: 以下是将结果集返回给主函数的process_query方法: 但我仍然得到了这个错误输出 请帮帮忙

  • 我有一个小数据集,它将是Spark工作的结果。为了方便起见,我正在考虑在作业结束时将此数据集转换为数据帧,但很难正确定义模式。问题是下面的最后一个字段(

  • 我来自Java背景,刚接触Scala。 我正在使用Scala和Spark。但是我不明白我在哪里使用和。 有人能告诉我在哪种情况下我需要使用这两个操作符吗?和之间有什么区别吗?

  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 这是我的问题。 我在犯错误。 Java语言lang.IllegalArgumentException:在EntityManager中创建查询时发生异常:异常描述:语法错误分析查询[select sum(u.expenseAmount),u.wdExpenseGroup.expenseGroupName from WdExpense u其中MONTH(CAST(u.expenseDate as da