当前位置: 首页 > 知识库问答 >
问题:

Scala与Python的Spark性能

翁宏茂
2023-03-14

我更喜欢Python而不是Scala。但是,由于Spark本机是用Scala编写的,出于明显的原因,我希望我的代码在Scala版本中比在Python版本中运行得更快。

基于这个假设,我想学习&为大约1 GB的数据编写一些非常常见的预处理代码的Scala版本。数据取自Kaggle上的SpringLeaf比赛。只是为了给出数据的概述(它包含1936个维度和145232行)。数据由各种类型组成,如int,float,string,boolean。我正在使用8个核心中的6个进行火花处理;这就是为什么我使用minpartitions=6,以便每个核心都有东西要处理。

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

性能上看,这种真实数据的Scala代码运行速度似乎比Python版本慢4倍。对我来说,好消息是这给了我继续留在Python的良好动力。坏消息是我不太明白为什么?

共有1个答案

赫连睿
2023-03-14

下面可以找到讨论代码的原始答案。

首先,您必须区分不同类型的API,每种API都有自己的性能考虑。

(带有基于JVM的编排的纯Python结构)

    null

基本注意事项与前面基本相同,但有一些附加问题。虽然MLlib使用的基本结构是简单的Python RDD对象,但所有算法都是直接使用Scala执行的。

这意味着将Python对象转换为Scala对象的成本会增加,反过来也会增加内存使用量,并且我们将在后面介绍一些额外的限制。

到目前为止(Spark2.x),基于RDD的API处于维护模式,并计划在Spark3.0中删除。

随着矢量化UDF(SPARK-21190和进一步的扩展)的引入,这一点 可能会改进 有了显著的改进,该UDF使用箭头流进行高效的数据交换,并进行零拷贝反序列化。对于大多数应用程序,它们的次要开销可以忽略不计。

还要确保避免在DataFramesRDDS之间不必要地传递数据。这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输了。

值得注意的是,Py4J调用具有相当高的延迟。这包括以下简单调用:

from pyspark.sql.functions import col

col("foo")

Spark2.x中的结构化流似乎缩小了语言之间的差距,但目前它仍处于早期阶段。然而,基于RDD的API在数据库文档(访问日期2017-03-03)中已经被引用为“遗留流”,因此有理由期待进一步的统一努力。

并非所有Spark特性都通过PySpark API公开。一定要检查您需要的部分是否已经实现,并尝试了解可能的限制。

当您使用MLlib和类似的混合上下文(请参见从任务调用Java/Scala函数)时,这一点尤为重要。公平地说,PySpark API的某些部分,比如mllib.linalg,提供了比Scala更全面的方法集。

对于高度依赖Python代码库的项目,纯Python替代方案(如Dask或Ray)可能是一个有趣的替代方案。

Spark DataFrame(SQL,Dataset)API提供了一种在PySpark应用程序中集成Scala/Java代码的优雅方法。您可以使用dataframes向本机JVM代码公开数据并回读结果。我在其他地方解释了一些选项,您可以在如何在Pyspark中使用Scala类中找到一个Python-Scala往返的工作示例。

通过引入用户定义的类型(请参见如何在Spark SQL中为自定义类型定义模式?),可以进一步增强它。

(免责声明:Pythonista的观点。很可能我错过了Scala的一些技巧)

首先,您的代码中有一个部分根本没有意义。如果您已经使用zipwithindexenumerate创建了(key,value)对,那么创建字符串然后再拆分它有什么意义呢?flatmap不能递归工作,因此您可以简单地生成元组,并跳过对map的跟踪。

通常情况下,我不会详细讨论这个问题,但据我所知,这是Scala代码中的一个瓶颈。在JVM上连接字符串是一个相当昂贵的操作(例如,请参见:scala中的字符串连接是否与Java中的字符串连接一样昂贵?)。这意味着类似于_.reducebykey((v1:String,v2:String)=>v1+','+v2)(相当于代码中的input4.reducebykey(valsConcat))的东西不是一个好主意。

如果要避免使用groupbykey,可以尝试将aggregatebykeystringbuilder一起使用。类似的方法应该能起到作用:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

但我怀疑这值得大惊小怪。

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

巨蟒:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

本地[6]模式下(Intel(R)至强(R)CPU E3-1245 V2@3.40GHz),每个执行器需要4GB内存(n=3):

  • Scala-mean:250.00s,stdev:12.49
  • Python-均值:246.66s,标准发展:1.15

我非常肯定,大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,下面是用Python编写的幼稚单线程代码,它在不到一分钟的时间内在这台机器上执行相同的任务:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
 类似资料:
  • 我正在构建我的第一个spark应用程序。 http://spark.apache.org/downloads.html告诉我火花2。x是根据Scala 2.11构建的。 在Scala网站上https://www.scala-lang.org/download/all.html我看到的版本是2.11.0-2.11.11 所以我的问题是:火花网站上的2.11到底是什么意思。它是2.11.0 - 2.1

  • 我对Spark很陌生,目前正在通过玩pyspark和Spark-Shell来探索它。 现在的情况是,我用pyspark和Spark-Shell运行相同的spark作业。 这是来自Pyspark: 使用spark-shell,工作在25分钟内完成,使用pyspark大约55分钟。如何让Spark独立地用pyspark分配任务,就像它用Spark-shell分配任务一样?

  • 我有一个RDD,其模式如下: (我们称之为) 我希望创建一个新的RDD,每一行都为,键和值属于。 我希望输出如下: 有人能帮我处理这段代码吗? 我的尝试: 错误:值映射不是Char的成员 我理解这是因为map函数只适用于,而不是每个。请帮助我在中使用嵌套函数。

  • 我正在尝试使用spark df读取spark中的CSV文件。文件没有标题列,但我想有标题列。如何做到这一点?我不知道我是否正确,我写了这个命令- 并将列名作为列的_c0和_c1。然后我尝试使用:val df1=df.with列重命名("_c0","系列")将列名更改为所需的名称,但我得到"with列重命名"不是单元上的成员。 PS:我已经导入了spark.implicits._和spark.sql

  • {“IFAM”:“EQR”,“KTM”:1430006400000,“COL”:21,“Data”:[{“MLRATE”:“30”,“NROUT”:“0”,“UP”:NULL,“板条箱”:“2”},{“MLRATE”:“31”,“NROUT”:“0”,“UP”:NULL,“板条箱”:“2”},{“MLRATE”:“30”,“NROUT”:“5”,“UP”:“NULL”:“2”},{“MLRATE”