这里是Scala新手。我在齐柏林飞艇笔记本上写了一份Spark作业的草稿。我使用数据集api,这样在执行ds.map(s=
现在我正在写一个实际的作业来在Apache Airflow中运行它。主文件如下所示:
class MainObj {
private val spark = SparkSession.builder()...getOrCreate()
import spark.implicits._
case class MyCaseClass(...)
def run() {
spark.read
...
.map(s => MyCaseClass(...))
...
}
}
object MainObj {
def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) =
new MainObj(arguments, sparkConf)
def main(args: Array[String]): Unit = {
MainObj(...).run()
}
}
在这种情况下,我得到:
无法为内部类MainObj$MyCaseClass生成编码器,而无法访问定义此类的作用域。
如果我添加
org。阿帕奇。火花sql。催化剂编码器。外窥镜。addOuterScope(这个)
在run()内部或之前,我得到:
原因:java。伊奥。NotSerializableException:MainObj序列化堆栈:对象不可序列化(类:MainObj,值:MainObj@2f11d889)
我还尝试将case类移动到一个单独的文件(不起作用)或
run()
(甚至不编译)中。
对这个问题非常沮丧...有人能帮忙吗,或者至少给我指出一个解释
case类
,spark.implicits
和作用域
之间关系的地方?
我在写问题时漏掉了一段非常重要的代码。
实际上,我的坏代码是这样的:
class MainObj {
private val spark = SparkSession.builder()...getOrCreate()
import spark.implicits._
case class MyCaseClass(...)
// This is what I left out
def someFunction() {
...
}
def run() {
spark.read
...
.map(s => { ...someFunction() ... }) // and this
.map(s => MyCaseClass(...))
...
}
}
object MainObj {
def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) =
new MainObj(arguments, sparkConf)
def main(args: Array[String]): Unit = {
MainObj(...).run()
}
}
这就是工作代码的样子:
// FIXED: moved case class to the root scope
case class MyCaseClass(...)
class MainObj {
private val spark = SparkSession.builder()...getOrCreate()
import spark.implicits._
def run() {
spark.read
...
.map(s => { ... MainObj.someFunction() ... }) // FIXED
.map(s => MyCaseClass(...))
...
}
}
object MainObj {
// FIXED: moved function to a companion object;
// now calling it inside map(...) does not trigger serialization
// of MainObj object, it works like a static method call in Java
def someFunction() {}
def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) =
new MainObj(arguments, sparkConf)
def main(args: Array[String]): Unit = {
MainObj(...).run()
}
}
您需要在根级别定义案例类,而不是在类/对象中。
你能用下面的结构试试吗?
case class MyCaseClass(...)
class MainObj {
private val spark = SparkSession.builder()...getOrCreate()
import spark.implicits._
def run() {
spark.read
...
.map(s => MyCaseClass(...))
...
}
}
object MainObj {
def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) =
new MainObj(arguments, sparkConf)
def main(args: Array[String]): Unit = {
MainObj(...).run()
}
}
null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?
我在Scala/Spark(1.5)和齐柏林飞艇上遇到了一个奇怪的问题: 如果我运行以下Scala/Spark代码,它将正常运行: 但是,在声明了此处建议的自定义数据帧类型之后 使用它的例子如下: 这次运行成功。 现在如果我再次运行下面的代码(同上) 我收到了错误信息: rdd:org。阿帕奇。火花rdd。RDD[Int]=ParallelCollectionRDD[8]位于parallelize
问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序
这给出的错误如下,任何帮助将是感激的:
我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:
问题内容: 我使用杰克逊测试了Scala案例类的序列化。 DeserializeTest.java 福斯卡拉 当我运行上述Java类时,抛出了一个异常: 我如何(反)序列化Scala案例类? 问题答案: Jackson希望您的类是JavaBean,这意味着它希望该类的每个属性都具有getX()和/或setX()。 选项1 您可以使用注释BeanProperty在Scala中创建JavaBean类。