我想在Spark Jobserver上运行Spark Job。在执行过程中,我遇到了一个例外:
堆栈:
JAVARuntimeException:scala。ScalarReflectionException:类com。一些实例仪器数据JavaMirror和org中的SQLMapping。阿帕奇。火花util。MutableURLClassLoader@55b699ef属于组织类。阿帕奇。火花util。类路径为[file:/app/spark job server.jar]且父级为sun的MutableURLClassLoader。杂项。发射器$AppClassLoader@2e817b38属于太阳类。杂项。找不到类路径为[…/classpath jars/]的启动器$AppClassLoader。
在斯卡拉。反映内部的镜像$RootsBase。scala的staticClass(Mirrors.scala:123)。反映内部的镜像$RootsBase。staticClass(Mirrors.scala:22)在com上发布。一些实例仪器DataRetriever$$anonfun$combineMappings$1$$TypeCreator 15$1。在scala应用(DataRetriever.scala:136)。反映应用程序编程接口。TypeTags$WeakTypeTagImpl。在scala的tpe$lzycompute(TypeTags.scala:232)。反映应用程序编程接口。TypeTags$WeakTypeTagImpl。tpe(TypeTags.scala:232)网址:org。阿帕奇。火花sql。催化剂编码器。ExpressionEncoder美元。在org上申请(ExpressionEncoder.scala:49)。阿帕奇。火花sql。编码器$。产品(Encoders.scala:275)位于org。阿帕奇。火花sql。LowPrioritySql包含其$class。newProductEncoder(SQLImplicits.scala:233)位于org。阿帕奇。火花sql。是的。com上的newProductEncoder(SQLImplicits.scala:33)。一些实例仪器DataRetriever$$anonfun$combineMappings$1。在com上申请(DataRetriever.scala:136)。一些实例仪器DataRetriever$$anonfun$combineMappings$1。在scala应用(DataRetriever.scala:135)。util。成功$$anonfun$map$1。在scala应用(试试scala:237)。util。试试美元。在scala应用(试试scala:192)。util。成功在scala上绘制地图(试试scala:237)。同时发生的未来$$anonfun$map$1。在scala申请(Future.scala:237)。同时发生的未来$$anonfun$map$1。在scala申请(Future.scala:237)。同时发生的impl。CallbackRunnable。在scala跑步(Promise.scala:32)。同时发生的impl。ExecutionContextImpl$AdaptedForkJoinTask。scala的执行官(ExecutionContextImpl.scala:121)。同时发生的分叉连接。这是一项艰巨的任务。scala的doExec(ForkJoinTask.java:260)。同时发生的分叉连接。ForkJoinPool$WorkQueue。scala上的runTask(ForkJoinPool.java:1339)。同时发生的分叉连接。福克斯:泳池。scala的runWorker(ForkJoinPool.java:1979)。同时发生的分叉连接。ForkJoinWorkerThread。运行(ForkJoinWorkerThread.java:107)
在DataRetriever
中,我将简单的case类转换为DataSet。
案例类别定义:
case class SQLMapping(id: String,
it: InstrumentPrivateKey,
cc: Option[String],
ri: Option[SourceInstrumentId],
p: Option[SourceInstrumentId],
m: Option[SourceInstrumentId])
case class SourceInstrumentId(instrumentId: Long,
providerId: String)
case class InstrumentPrivateKey(instrumentId: Long,
providerId: String,
clientId: String)
导致问题的代码:
import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
}
}
作业有时会工作,但如果我重新运行作业,它将抛出异常。
更新2018年3月28日我忘了提到一个细节,这很重要。数据集是在Future
内部构建的。
在将来调用toDS()
,导致ScalaReflse异常。
我决定在未来之外构建数据集。地图
。
您可以验证数据集不能在将来构建。使用此示例作业映射
。
package com.example.sparkapplications
import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobValidation
object FutureJob extends SparkJob{
override def runJob(sc: SparkContext,
jobConfig: Config): Any = {
val session = SparkSession.builder().config(sc.getConf).getOrCreate()
import session.implicits._
val f = Future{
val seq = Seq(
Dummy("1", 1),
Dummy("2", 2),
Dummy("3", 3),
Dummy("4", 4),
Dummy("5", 5)
)
val ds = seq.toDS
ds.collect()
}
Await.result(f, 10 seconds)
}
case class Dummy(id: String, value: Long)
override def validate(sc: SparkContext,
config: Config): SparkJobValidation = SparkJobValid
}
稍后,我将提供信息,如果问题仍然使用火花2.3.0,当你通过火花提交直接传递jar。
我通过连接单独的月、日、年和时间列创建了日期列,但是月和日列的输入数据形式为1,而不是01表示月和日。这是我返回空列的原因吗?还是有其他原因?如果这就是原因,那么如何将日和月列从1改为01、2改为02、…? 这是我第一次使用时间戳,而且我是Scala新手,所以我非常感谢您的帮助。
我正在尝试使用JDBC处理Presto上的查询,并将结果集传递回Spark,以便在其上创建临时表。我的结果集在列表中 我从kafka producer获得了json Msg形式的查询。因此,我们在spark中创建了kafka consumer,以获取信息并进行进一步处理。 以下是我的主要功能: 以下是将结果集返回给主函数的process_query方法: 但我仍然得到了这个错误输出 请帮帮忙
我有一个小数据集,它将是Spark工作的结果。为了方便起见,我正在考虑在作业结束时将此数据集转换为数据帧,但很难正确定义模式。问题是下面的最后一个字段(
我来自Java背景,刚接触Scala。 我正在使用Scala和Spark。但是我不明白我在哪里使用和。 有人能告诉我在哪种情况下我需要使用这两个操作符吗?和之间有什么区别吗?
我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (
这是我的问题。 我在犯错误。 Java语言lang.IllegalArgumentException:在EntityManager中创建查询时发生异常:异常描述:语法错误分析查询[select sum(u.expenseAmount),u.wdExpenseGroup.expenseGroupName from WdExpense u其中MONTH(CAST(u.expenseDate as da