我对Spark和Scala相当陌生。我试图调用一个函数作为火花UDF,但我遇到了这个错误,我似乎不能解决。
我知道在Scala中,Array和Seq是不一样的。WrappedArray是Seq的一个子类型,WrappedArray和Array之间有隐式转换,但我不确定为什么在UDF的情况下没有这样做。
任何帮助我理解和解决这个问题的指针都非常感谢。
下面是代码片段
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))
case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF
mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
case class myType2(id: Int, a: Array[Int])
val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) )
val idDF = idRDD.toDF
idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>]
root
|-- id: integer (nullable = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
import sqlContext.implicits._
/* Hive context is exposed as sqlContext */
val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id"))
val k = j.withColumn("filteredMap",myUDF(j("m"), j("a")))
k.show
查看数据帧“j”
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
但是,调用UDF的数据帧“k”上的操作失败,并出现以下错误-
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
将函数filterMapKeysSusSet中的数据类型从Array[Int]更改为Seq[Int]似乎可以解决上述问题。
def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a))
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
+---+--------------------+----------------+--------------------+
| id| m| a| filteredMap|
+---+--------------------+----------------+--------------------+
| 1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...|
| 2|Map(1 -> 100, 2 -...| [100, 200]| Map()|
| 3|Map(3 -> 300, 4 -...| [1, 2]| Map()|
+---+--------------------+----------------+--------------------+
因此,看起来数据帧“idDF”上的数组类型实际上是一个包装数组而不是数组 - 因此对“filterMapKeysWithSet”的函数调用失败了,因为它期望数组,而是得到了一个包装数组/ Seq(在Scala 2.8及更高版本中没有隐式转换为数组)。
在我的项目中,我有这样一个枚举: 我有这个代码: 我有个例外 myMap由数据库中的数据填充,知道它是SQL Server数据库,并且从数据库返回的myKey在数据库中是tinyint类型。 你能告诉我我做错了什么吗?谢谢 当做
问题内容: 因此,我今天晚上将XCode更新为7.3。 在我的一个项目中,在设置字体的几个标签上出现以下错误: 编辑: 这是我的导航栏中标题视图的代码: 这是我的UILabel属性字符串的代码: 在我整个项目中,只有两个文件会发生这种情况。 我打开了另一个项目,但是即使在此我也确实为多个标签设置了字体,我也可以构建并运行它而没有任何错误。 知道为什么会这样吗? TIA! 问题答案: 在SO的其他地
我刚开始编程,我总是收到错误消息,“不兼容的类型,int不能转换为int[]”,问题是如果R1和R2的长度相等,就把它们加在一起,如果不是,打印一条消息,说“数组必须是相同的长度”,如果这很重要,不确定我在哪里出错了,任何帮助都将非常感谢
我在HackerRank(对角线差异)上做这个问题,我现在面临一个问题。 以下是问题的链接: https://www.hackerrank.com/challenges/diagonal-difference/problem 问题是,我总是得到一个错误:。 我尝试初始化变量j,但这也不能解决问题。 下面是我的代码:
问题内容: 为什么会收到错误,我该如何解决? 如您所见,在声明变量时,我已经尝试过用顶部附近的位置进行替换,但是似乎并没有完成任务。 问题答案: 更改使用数组索引从double到INT所有变量(即变量,,)。数组索引是整数。
问题内容: 我想知道是否有任何方法可以将Integer类型的变量转换为BigInteger。我尝试类型转换Integer变量,但出现一个错误消息,指出不可转换的类型。 问题答案: 您想要的方法是BigInteger#valueOf(long val) 。 例如, 首先创建String是不必要的,也是不希望的。