我在Spark(Scala)中使用UDF遇到问题。这是一个示例代码:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, udf}
val spark = SparkSession.builder.appName("test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
def func(a: Array[Int]): Array[Int] = a
val funcUDF = udf((a: Array[Int]) => func(a))
var data = Seq(Array(1, 2, 3), Array(3, 4, 5), Array(6, 2, 4)).toDF("items")
data = data.withColumn("a", funcUDF(col("items")))
data.show()
我得到的错误与ClassCastException有关,表示不可能从<code>scala.collection.mutable强制转换。WrappedArray$ofReftoorg.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2
。我在下面添加了堆栈的一部分。如果有帮助,我正在使用https://community.cloud.databricks.com/.
问题是您的“items”列是WrappedArray类型(这是每个array like类型的Spark类型)。并且Array和WrappedArray之间没有隐式转换。所以我会建议使用Seq,因为WrappedArray是Seq的子类,但不是Array的子类。
这工作原理:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions.{col, udf}
val spark = SparkSession.builder.appName("test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
def func(a: Array[Int]): Array[Int] = a
val funcUDF = udf((a: Seq[Int]) => func(a.toArray))
var data = Seq(Array(1, 2, 3), Array(3, 4, 5), Array(6, 2, 4)).toDF("items")
data = data.withColumn("a", funcUDF(col("items")))
data.show()
我更喜欢Python而不是Scala。但是,由于Spark本机是用Scala编写的,出于明显的原因,我希望我的代码在Scala版本中比在Python版本中运行得更快。 基于这个假设,我想学习&为大约1 GB的数据编写一些非常常见的预处理代码的Scala版本。数据取自Kaggle上的SpringLeaf比赛。只是为了给出数据的概述(它包含1936个维度和145232行)。数据由各种类型组成,如int
如何实现reduceByKey而不是上面的代码来提供相同的映射?
代码示例: 第二个问题--模拟scala对象,似乎需要使用其他方法来创建这样的服务。
我有一个RDD,其模式如下: (我们称之为) 我希望创建一个新的RDD,每一行都为,键和值属于。 我希望输出如下: 有人能帮我处理这段代码吗? 我的尝试: 错误:值映射不是Char的成员 我理解这是因为map函数只适用于,而不是每个。请帮助我在中使用嵌套函数。
嗨,我正在尝试从一群执行者那里登录到一个Kafka主题,他们使用ApacheSpark和Log4J以及KafkaAppender。我可以使用基本的文件附加器与执行者登录,但不能登录到Kafka。 这是我的log4j.properties我为此定制的: 这是我的代码(到目前为止)。我试图传递一个记录器定义,以便每个执行者都能得到一个副本,但我不知道为什么它不会传到Kafka: 下面是日志文件的一些示
以下是我的构建。SBT内容: