当前位置: 首页 > 工具软件 > Spark Kernel > 使用案例 >

Spark_Spark Spark优化 - RDD暂存

方子安
2023-12-01

 

    编写一个Spark 作业较为容易,但是编写一个高效的 spark 并不是一个容易的事。

本篇主要从暂存的角度 讲解下  ,如何去优化 Spark Job.

 

参考文章: 

【转】Spark性能优化指南——基础篇

https://www.cnblogs.com/hark0623/p/5533803.html

 

spark Streaming 存储级别定义StorageLevel

https://blog.csdn.net/swcj/article/details/52586345

 

      Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些Spark基本开发原则。

包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。

     在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。

下面先讲一下,什么样的业务场景需要使用 RDD 暂存的方式。

 

 

原则一:避免创建重复的RDD


       通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。

      在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是是“RDD的血缘关系链”。

     我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

      一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。

   这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

 

一个简单的例子

     需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。

 

错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。

    这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。

    这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。

代码如下:

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") 

rdd1.map(...) 

val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") 

rdd2.reduce(...) 

 

正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。

      这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。

     但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。  要彻底解决这个问题,必须结合 “原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") 

rdd1.map(...)

rdd1.reduce(...)

 

 

 

原则二:尽可能复用同一个RDD :


     除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。

     那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

 

一个简单的例子

      有一个<Long, String>格式的RDD,即rdd1。 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,

     也就是说,rdd2是rdd1的子集。 分别对rdd1和rdd2执行了不同的算子操作。

 

错误的做法 : 
 

JavaPairRDD<Long, String> rdd1 = ... 

JavaRDD<String> rdd2 = rdd1.map(...) 

rdd1.reduceByKey(...) 

rdd2.map(...) 

 

正确的做法 : 

      上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。

     其实在这种情况下完全可以复用同一个RDD。 我们可以使用rdd1,既做reduceByKey操作,也做map操作。在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。
 

JavaPairRDD<Long, String> rdd1 = ... 

rdd1.reduceByKey(...) 

rdd1.map(tuple._2...) 

      第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。因此,还需要配合  “原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。
 

 

原则三:对多次使用的RDD进行持久化


      当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

      Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

      因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

 

对多次使用的RDD进行持久化的代码示例 :


      如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

正确的做法。
 

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache() 

rdd1.map(...) 

rdd1.reduce(...) 

    此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd

     

     cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。  

     persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。

 

    比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。

     而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。

     序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER) 

rdd1.map(...) 

rdd1.reduce(...)

 

所有 暂存级别 :

源自    org.apache.spark.storage.StorageLevel.scala 源码:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.storage

import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.memory.MemoryMode
import org.apache.spark.util.Utils

/**
 * :: DeveloperApi ::
 * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
 * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
 * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
 * to replicate the RDD partitions on multiple nodes.
 *
 * The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants
 * for commonly useful storage levels. To create your own storage level object, use the
 * factory method of the singleton object (`StorageLevel(...)`).
 */
@DeveloperApi
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {

  // TODO: Also add fields for caching priority, dataset ID, and flushing.
  private def this(flags: Int, replication: Int) {
    this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
  }

  def this() = this(false, true, false, false)  // For deserialization

  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication

  assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")

  if (useOffHeap) {
    require(!deserialized, "Off-heap storage level does not support deserialized storage")
  }

  private[spark] def memoryMode: MemoryMode = {
    if (useOffHeap) MemoryMode.OFF_HEAP
    else MemoryMode.ON_HEAP
  }

  override def clone(): StorageLevel = {
    new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
  }

  override def equals(other: Any): Boolean = other match {
    case s: StorageLevel =>
      s.useDisk == useDisk &&
      s.useMemory == useMemory &&
      s.useOffHeap == useOffHeap &&
      s.deserialized == deserialized &&
      s.replication == replication
    case _ =>
      false
  }

  def isValid: Boolean = (useMemory || useDisk) && (replication > 0)

  def toInt: Int = {
    var ret = 0
    if (_useDisk) {
      ret |= 8
    }
    if (_useMemory) {
      ret |= 4
    }
    if (_useOffHeap) {
      ret |= 2
    }
    if (_deserialized) {
      ret |= 1
    }
    ret
  }

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    out.writeByte(toInt)
    out.writeByte(_replication)
  }

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    val flags = in.readByte()
    _useDisk = (flags & 8) != 0
    _useMemory = (flags & 4) != 0
    _useOffHeap = (flags & 2) != 0
    _deserialized = (flags & 1) != 0
    _replication = in.readByte()
  }

  @throws(classOf[IOException])
  private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

  override def toString: String = {
    val disk = if (useDisk) "disk" else ""
    val memory = if (useMemory) "memory" else ""
    val heap = if (useOffHeap) "offheap" else ""
    val deserialize = if (deserialized) "deserialized" else ""

    val output =
      Seq(disk, memory, heap, deserialize, s"$replication replicas").filter(_.nonEmpty)
    s"StorageLevel(${output.mkString(", ")})"
  }

  override def hashCode(): Int = toInt * 41 + replication

  def description: String = {
    var result = ""
    result += (if (useDisk) "Disk " else "")
    if (useMemory) {
      result += (if (useOffHeap) "Memory (off heap) " else "Memory ")
    }
    result += (if (deserialized) "Deserialized " else "Serialized ")
    result += s"${replication}x Replicated"
    result
  }
}


/**
 * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
 * new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

  /**
   * :: DeveloperApi ::
   * Return the StorageLevel object with the specified name.
   */
  @DeveloperApi
  def fromString(s: String): StorageLevel = s match {
    case "NONE" => NONE
    case "DISK_ONLY" => DISK_ONLY
    case "DISK_ONLY_2" => DISK_ONLY_2
    case "MEMORY_ONLY" => MEMORY_ONLY
    case "MEMORY_ONLY_2" => MEMORY_ONLY_2
    case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
    case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
    case "MEMORY_AND_DISK" => MEMORY_AND_DISK
    case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
    case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
    case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
    case "OFF_HEAP" => OFF_HEAP
    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object.
   */
  @DeveloperApi
  def apply(
      useDisk: Boolean,
      useMemory: Boolean,
      useOffHeap: Boolean,
      deserialized: Boolean,
      replication: Int): StorageLevel = {
    getCachedStorageLevel(
      new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object without setting useOffHeap.
   */
  @DeveloperApi
  def apply(
      useDisk: Boolean,
      useMemory: Boolean,
      deserialized: Boolean,
      replication: Int = 1): StorageLevel = {
    getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object from its integer representation.
   */
  @DeveloperApi
  def apply(flags: Int, replication: Int): StorageLevel = {
    getCachedStorageLevel(new StorageLevel(flags, replication))
  }

  /**
   * :: DeveloperApi ::
   * Read StorageLevel object from ObjectInput stream.
   */
  @DeveloperApi
  def apply(in: ObjectInput): StorageLevel = {
    val obj = new StorageLevel()
    obj.readExternal(in)
    getCachedStorageLevel(obj)
  }

  private[spark] val storageLevelCache = new ConcurrentHashMap[StorageLevel, StorageLevel]()

  private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
    storageLevelCache.putIfAbsent(level, level)
    storageLevelCache.get(level)
  }
}

 

=============================================================

 

比较重要的部分:

/**
 * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
 * new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

 

 

_SER后缀 : 使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。

_2后缀 : 代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。

 

主要暂存级别解释:

 

MEMORY_ONLY    

      默认选项,RDD的(分区)数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据将不会被缓存,需要在使用的时候根据世代信息重新计算。

使用cache()方法时,实际就是使用的这种持久化策略。


MYMORY_AND_DISK    

       RDD的数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不中,某些分区的数据会被存储至磁盘,使用的时候从磁盘读取。


MEMORY_ONLY_SER    

     RDD的数据(Java对象)序列化之后存储于JVM的内存中(一个分区的数据为内存中的一个字节数组),相比于MEMORY_ONLY能够有效节约内存空间(特别是使用一个快速序列化工具的情况下),但读取数据时需要更多的CPU开销;

如果内存空间不足,处理方式与MEMORY_ONLY相同。


MEMORY_AND_DISK_SER  

   相比于MEMORY_ONLY_SER,在内存空间不足的情况下,将序列化之后的数据存储于磁盘。


DISK_ONLY    

    仅仅使用磁盘存储RDD的数据(未经序列化)。


MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc.

   以MEMORY_ONLY_2为例,MEMORY_ONLY_2相比于MEMORY_ONLY存储数据的方式是相同的,不同的是会将数据备份到集群中两个不同的节点,其余情况类似。


OFF_HEAP(experimental)    

     RDD的数据序例化之后存储至Tachyon。相比于MEMORY_ONLY_SER,OFF_HEAP能够减少垃圾回收开销、使得SparkExecutor更“小”更“轻”的同时可以共享内存;而且数据存储于Tachyon中,Spark集群节点故障并不会造成数据丢失,因此这种方式在“大”内存或多并发应用的场景下是很有吸引力的。

需要注意的是,Tachyon并不直接包含于Spark的体系之内,需要选择合适的版本进行部署;它的数据是以“块”为单位进行管理的,这些块可以根据一定的算法被丢弃,且不会被重建。
 

 

 

如何选择一种最合适的持久化策略 ? 


     默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。

      但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

     如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

      如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

 

 

 

通常不建议使用DISK_ONLY和后缀为_2的级别:

     因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。

     后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

 

 

 

 类似资料: