以下程序尝试为每一行(在RDD映射中)调用3个函数:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
class TagCalculation extends Serializable {
def test1(x: String) = x + " test1"
def test2(x: String) = x + "test2"
def test3(x: String) = x + "test3"
def test5(arg1: java.lang.Integer, arg2: String, arg3: scala.collection.immutable.$colon$colon[Any]) = "test mix2"
}
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
val get_test = new TagCalculation
val field = Array("test1","test2","test3")
val bb = df.rdd.map(row => {
val reValue1 = "start"
val ret = for(every <- field)
yield {
val test_para = Array(reValue1)
val argtypes = test_para.map(_.getClass)
val method4 = get_test.getClass.getMethod(every, argtypes: _*)
val bbq = method4.invoke(get_test, test_para: _*)
if (field.last == every)
bbq
}
ret.last
})
但有些错误输出:
它可能是由“隐式val格式=DefaultFormats”引起的。但我需要在“映射”之前提取值。
问题是因为您是在调用类
中定义TagCalculation
类,在该类中初始化和使用对象。只需将其移到调用类
之外,或使其成为一个单独的类
即可解决NotSerializableException
的问题。
下面的程序尝试为每个ROW(在RDD映射中)调用3个函数: 但也有一些错误: 组织。阿帕奇。火花SparkException:任务在组织中不可序列化。阿帕奇。火花util。ClosureCleaner美元。EnsureCleaner.scala:304)可在org。阿帕奇。火花util。ClosureCleaner美元。org$apache$spark$util$ClosureCleaner$$c
我已经上了三节课 任务未序列化
我有一个RDD格式为RDD[((Long,Long),(Long,Long))],我需要隐藏或转换为RDD[(Long,Long),(Long,Long,Long,Long))],其中第二个RDD元组基于第一个RDD的函数。 我正在尝试实现这个基于地图的功能,但是,我认为在这里做了一些错误的事情。请帮我解决这个问题。
我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中
null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?
我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace: