将现有应用程序从Spark 1.6移动到Spark 2.2*(最终)会导致错误“org.apache.spark.SparkExctive:任务不可序列化”。我过于简化了我的代码,以演示同样的错误。代码查询拼花文件以返回以下数据类型:“org.apache.spark.sql.数据集[org.apache.spark.sql.行]”我应用一个函数来提取字符串和整数,返回字符串。一个固有的问题与Spark 2.2返回数据集而不是数据帧有关。(请参阅之前关于初步错误的帖子)如何编写数据集编码器以支持将函数映射到Scala Spark中的org.apache.spark.sql.数据集[字符串]
var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> def dd(s:String, start: Int) = { s + "some string" }
dd: (s: String, start: Int)String
scala> var d3 = d2.map{s=> dd(s,5) }
d3: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d3.take(1)
org.apache.spark.SparkException: Task not serializable
我目前解决这个问题的方法是嵌入内联代码(见下文),但并不实用,因为我的生产代码包含大量的参数和函数。我还尝试过转换为数据帧(就像spark 1.6中那样)和函数定义的变体,这些都没有证明是可行的解决方案。
scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> var d3 = d2.map{s=> { s + "some string" } }
d3: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d3.take(1)
20/04/30 15:16:17 WARN TaskSetManager: Stage 0 contains a task of very large size (132 KB). The maximum recommended task size is 100 KB.
res1: Array[String] = Array(761f006000705904,1521833533.96682some string)
org。阿帕奇。火花SparkException:任务未序列化
要解决这个问题,请将您的所有功能
通过这种方式,您可以解决大多数序列化问题
示例代码
package common
object AppFunctions {
def append(s: String, start: Int) = s"${s}some thing"
}
object ExecuteQuery {
import common.AppFunctions._
[...]
val d3 = d2.map(s => append(s,5)) // Pass required values to method.
[...]
}
我在我大学的热图项目中,我们必须从txt文件(坐标、高度)中获取一些数据(212Go),然后将其放入HBase以在带有Express的Web客户端上检索它。 我练习使用144Mo文件,这是工作: 但是我现在使用212Go文件,我有一些内存错误,我猜收集方法会收集内存中的所有数据,所以212Go太多了。 所以现在我在尝试这个: 我得到了“org.apache.spark.SparkException
我正在尝试了解这个位置的scala代码。(我来自java背景)。 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala 我在下面的部分感觉完全迷失了 我知道并行化和平面映射的作用。我不明白arr1是如何初始化的。它是 int 类型
我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:
scala的版本是2.11.8;jdk为1.8;spark是2.0.2 我试图在spark apache的官方网站上运行一个LDA模型的示例,我从以下句子中得到了错误消息: 错误按摩是: 错误:(49,25)读取的值不是组织的成员。阿帕奇。火花SparkContext val dataset=spark。阅读格式(“libsvm”)^ 我不知道怎么解决。
我正在用SparkMaster api 7077执行JettyRun和ClusterMode。我将cassandra驱动程序和spark-cassandra连接器的jar传递给spark conf(setjar) 有些时候,如果我重新启动,它是有效的,但有几次,我不得不尝试和尝试,从来没有工作。 我尝试了一些答案,比如将Spark番石榴罐子重命名为19版本,但总是遇到同样的问题。 怎么回事?
我尝试使用I forest https://github.com/titicaca/spark-iforest,的scala实现,但是当我构建时(就像README中报告的< code>mvn clean package),它给我这些错误: 有人知道为什么吗?谢谢 scala版本2.11.12 火花版本2.4.0 maven版本3.5.2 我修改了pom.xml,调整了scala、spark和mav