下面的程序尝试为每个ROW(在RDD映射中)调用3个函数:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
class TagCalculation extends Serializable {
def test1(x: String) = x + " test1"
def test2(x: String) = x + "test2"
def test3(x: String) = x + "test3"
def test5(arg1: java.lang.Integer, arg2: String, arg3: scala.collection.immutable.$colon$colon[Any]) = "test mix2"
}
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
val get_test = new TagCalculation
val field = Array("test1","test2","test3")
val bb = df.rdd.map(row => {
val reValue1 = "start"
val ret = for(every <- field)
yield {
val test_para = Array(reValue1)
val argtypes = test_para.map(_.getClass)
val method4 = get_test.getClass.getMethod(every, argtypes: _*)
val bbq = method4.invoke(get_test, test_para: _*)
if (field.last == every)
bbq
}
ret.last
})
但也有一些错误:
组织。阿帕奇。火花SparkException:任务在组织中不可序列化。阿帕奇。火花util。ClosureCleaner美元。EnsureCleaner.scala:304)可在org。阿帕奇。火花util。ClosureCleaner美元。org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)位于org。阿帕奇。火花util。ClosureCleaner美元。clean(ClosureCleaner.scala:122)在org上。阿帕奇。火花SparkContext。clean(SparkContext.scala:2032)位于org。阿帕奇。火花rdd。RDD$$anonfun$map$1。在org上申请(RDD.scala:314)。阿帕奇。火花rdd。RDD$$anonfun$map$1。在org上申请(RDD.scala:313)。阿帕奇。火花rdd。RDDOperationScope$。在org.com上使用Scope(RDDOperationScope.scala:147)。阿帕奇。火花rdd。RDDOperationScope$。使用org上的scope(RDDOperationScope.scala:108)。阿帕奇。火花rdd。RDD。组织上的Wisscope(RDD.scala:306)。阿帕奇。火花rdd。RDD。地图(RDD.scala:313)。。。。。。。。在org。阿帕奇。火花部署SparkSubmit美元。在org上提交(SparkSubmit.scala:205)。阿帕奇。火花部署SparkSubmit美元。main(SparkSubmit.scala:120)位于org。阿帕奇。火花部署SparkSubmit。main(SparkSubmit.scala)由以下原因引起:java。伊奥。NotSerializableException:org。json4s。默认格式$
有什么指示吗?
这可能是由“隐式val格式=默认格式”引起的。但我需要在“地图”之前提取值。
这个问题是因为您在初始化和使用对象的调用类
内定义了TagCompation
类。只要将它移到调用类
之外,或者使它成为一个单独的类,NotSerializableExctive
的问题就应该得到解决。
以下程序尝试为每一行(在RDD映射中)调用3个函数: 但有些错误输出: 它可能是由“隐式val格式=DefaultFormats”引起的。但我需要在“映射”之前提取值。
我有一个RDD格式为RDD[((Long,Long),(Long,Long))],我需要隐藏或转换为RDD[(Long,Long),(Long,Long,Long,Long))],其中第二个RDD元组基于第一个RDD的函数。 我正在尝试实现这个基于地图的功能,但是,我认为在这里做了一些错误的事情。请帮我解决这个问题。
我有一个这样的转变: Pageview是一种:Pageview。JAVA 我在Spark上注册了这样的课程: 异常线程"main"org.apache.spark.SparkExctive:任务不能在org.apache.spark.util.ClosureCleaner$. ensureSerializable(ClosureCleaner.scala:166)在org.apache.spark
我已经上了三节课 任务未序列化
我通读了地图和地图分区之间的理论差异, 但我下面描述的问题更多地基于GC活动 = = 提前感谢。任何帮助都将不胜感激。
我有两个RDD: RDD1=(单词,分数)#单词:字符串|分数:int 因此,对于RDD2中的每个“id”,我想计算文本中每个单词的平均分数,如果它有分数的话 我收到了这个错误信息 异常:似乎您正试图广播RDD或从操作或转换引用RDD。RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x:rdd2.values.count()*x)无效,因为值转换和