我更喜欢Python而不是Scala。但是,由于Spark本机是用Scala编写的,出于明显的原因,我希望我的代码在Scala版本中比在Python版本中运行得更快。
基于这个假设,我想学习&为大约1 GB的数据编写一些非常常见的预处理代码的Scala版本。数据取自Kaggle上的SpringLeaf比赛。只是为了给出数据的概述(它包含1936个维度和145232行)。数据由各种类型组成,如int,float,string,boolean。我正在使用8个核心中的6个进行火花处理;这就是为什么我使用minpartitions=6
,以便每个核心都有东西要处理。
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
从性能上看,这种真实数据的Scala代码运行速度似乎比Python版本慢4倍。对我来说,好消息是这给了我继续留在Python的良好动力。坏消息是我不太明白为什么?
下面可以找到讨论代码的原始答案。
首先,您必须区分不同类型的API,每种API都有自己的性能考虑。
(带有基于JVM的编排的纯Python结构)
基本注意事项与前面基本相同,但有一些附加问题。虽然MLlib使用的基本结构是简单的Python RDD对象,但所有算法都是直接使用Scala执行的。
这意味着将Python对象转换为Scala对象的成本会增加,反过来也会增加内存使用量,并且我们将在后面介绍一些额外的限制。
到目前为止(Spark2.x),基于RDD的API处于维护模式,并计划在Spark3.0中删除。
随着矢量化UDF(SPARK-21190和进一步的扩展)的引入,这一点
可能会改进
有了显著的改进,该UDF使用箭头流进行高效的数据交换,并进行零拷贝反序列化。对于大多数应用程序,它们的次要开销可以忽略不计。
还要确保避免在DataFrames
和RDDS
之间不必要地传递数据。这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输了。
值得注意的是,Py4J调用具有相当高的延迟。这包括以下简单调用:
from pyspark.sql.functions import col
col("foo")
Spark2.x中的结构化流似乎缩小了语言之间的差距,但目前它仍处于早期阶段。然而,基于RDD的API在数据库文档(访问日期2017-03-03)中已经被引用为“遗留流”,因此有理由期待进一步的统一努力。
并非所有Spark特性都通过PySpark API公开。一定要检查您需要的部分是否已经实现,并尝试了解可能的限制。
当您使用MLlib和类似的混合上下文(请参见从任务调用Java/Scala函数)时,这一点尤为重要。公平地说,PySpark API的某些部分,比如mllib.linalg
,提供了比Scala更全面的方法集。
对于高度依赖Python代码库的项目,纯Python替代方案(如Dask或Ray)可能是一个有趣的替代方案。
Spark DataFrame(SQL,Dataset)API提供了一种在PySpark应用程序中集成Scala/Java代码的优雅方法。您可以使用dataframes
向本机JVM代码公开数据并回读结果。我在其他地方解释了一些选项,您可以在如何在Pyspark中使用Scala类中找到一个Python-Scala往返的工作示例。
通过引入用户定义的类型(请参见如何在Spark SQL中为自定义类型定义模式?),可以进一步增强它。
(免责声明:Pythonista的观点。很可能我错过了Scala的一些技巧)
首先,您的代码中有一个部分根本没有意义。如果您已经使用zipwithindex
或enumerate
创建了(key,value)
对,那么创建字符串然后再拆分它有什么意义呢?flatmap
不能递归工作,因此您可以简单地生成元组,并跳过对map
的跟踪。
通常情况下,我不会详细讨论这个问题,但据我所知,这是Scala代码中的一个瓶颈。在JVM上连接字符串是一个相当昂贵的操作(例如,请参见:scala中的字符串连接是否与Java中的字符串连接一样昂贵?)。这意味着类似于_.reducebykey((v1:String,v2:String)=>v1+','+v2)
(相当于代码中的input4.reducebykey(valsConcat)
)的东西不是一个好主意。
如果要避免使用groupbykey
,可以尝试将aggregatebykey
与stringbuilder
一起使用。类似的方法应该能起到作用:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
但我怀疑这值得大惊小怪。
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
巨蟒:
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
在本地[6]
模式下(Intel(R)至强(R)CPU E3-1245 V2@3.40GHz),每个执行器需要4GB内存(n=3):
我非常肯定,大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,下面是用Python编写的幼稚单线程代码,它在不到一分钟的时间内在这台机器上执行相同的任务:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])
我正在构建我的第一个spark应用程序。 http://spark.apache.org/downloads.html告诉我火花2。x是根据Scala 2.11构建的。 在Scala网站上https://www.scala-lang.org/download/all.html我看到的版本是2.11.0-2.11.11 所以我的问题是:火花网站上的2.11到底是什么意思。它是2.11.0 - 2.1
我对Spark很陌生,目前正在通过玩pyspark和Spark-Shell来探索它。 现在的情况是,我用pyspark和Spark-Shell运行相同的spark作业。 这是来自Pyspark: 使用spark-shell,工作在25分钟内完成,使用pyspark大约55分钟。如何让Spark独立地用pyspark分配任务,就像它用Spark-shell分配任务一样?
我有一个RDD,其模式如下: (我们称之为) 我希望创建一个新的RDD,每一行都为,键和值属于。 我希望输出如下: 有人能帮我处理这段代码吗? 我的尝试: 错误:值映射不是Char的成员 我理解这是因为map函数只适用于,而不是每个。请帮助我在中使用嵌套函数。
我正在尝试使用spark df读取spark中的CSV文件。文件没有标题列,但我想有标题列。如何做到这一点?我不知道我是否正确,我写了这个命令- 并将列名作为列的_c0和_c1。然后我尝试使用:val df1=df.with列重命名("_c0","系列")将列名更改为所需的名称,但我得到"with列重命名"不是单元上的成员。 PS:我已经导入了spark.implicits._和spark.sql
以下是我的构建。SBT内容:
{“IFAM”:“EQR”,“KTM”:1430006400000,“COL”:21,“Data”:[{“MLRATE”:“30”,“NROUT”:“0”,“UP”:NULL,“板条箱”:“2”},{“MLRATE”:“31”,“NROUT”:“0”,“UP”:NULL,“板条箱”:“2”},{“MLRATE”:“30”,“NROUT”:“5”,“UP”:“NULL”:“2”},{“MLRATE”