首先,这三者都是做RDD持久化的,cache()和persist()是将数据默认缓存在内存中,checkpoint()是将数据做物理存储的(本地磁盘或Hdfs上),当然rdd.persist(StorageLevel.DISK_ONLY)也可以存储在磁盘 。
其次,缓存机制里的cache和persist都是用于将一个RDD进行缓存,区别就是:cache()是persisit()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY)将数据持久化到内存中。如果需要从内存中清除缓存,那么可以使用unpersist()方法。cache () = persist()=persist(StorageLevel.Memory_Only)
另外,cache 跟 persist不会截断血缘关系,checkPoint会截断血缘关系。
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
从以上源码可以看出,cache 底层调用的是 persist 方法,存储等级为: memory only,persist 的默认存储级别也是 memory only,persist 与 cache 的主要区别是 persist 可以自定义存储级别。如rdd.persist(StorageLevel.DISK_ONLY) ,就可以将数据存储在磁盘上。哪些 RDD 需要cache?会被重复使用的(但是)不能太大的RDD需要cache,cache 只使用 memory。
cache 和 checkpoint 之间有一个重大的区别,cache 将 RDD 以及 RDD 的血统(记录了这个RDD如何产生)缓存到内存中,当缓存的 RDD 失效的时候(如内存损坏),它们可以通过血统重新计算来进行恢复。但是 checkpoint 将 RDD 缓存到了 HDFS 中,同时忽略了它的血统(也就是RDD之前的那些依赖)。为什么要丢掉依赖?因为可以利用 HDFS 多副本特性保证容错!
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
从上面的源码中可以看出,在进行checkpoint之前要设置 sc.setCheckpointDir("...")。除此之外,在进行 checkpoint 前,要先对 RDD 进行 cache。Why??? checkpoint 会等到 job 结束后另外启动专门的 job 去完成 checkpoint,也就是说需要 checkpoint 的 RDD 会被计算两次。
persist(StorageLevel.DISK_Only) != checkPoint()
rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。
cache: 对于会被重复使用,但是数据量不是太大的RDD,可以将其cache()到内存当中。cache 是每计算出一个要 cache 的 partition 就直接将其 cache 到内存中。缓存完之后,可以在任务监控界面storage里面看到缓存的数据。
checkpoint:对于computing chain 计算链过长或依赖其他 RDD 很多的 RDD,就需要进行checkpoint,将其放入到HDFS或者本地文件夹当中。需要注意的是,checkpoint 需要等到job完成了,再启动专门的job去完成checkpoint 操作,因此RDD是被计算了两次的。一般使用的时候配合rdd.cache(),这样第二次就不用重新计算RDD了,直接读取 cache 写磁盘。
注意:rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别,persist一旦程序执行结束,所有的缓存无论在内存还是磁盘都会被删掉。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver,program 使用,而 cached RDD 不能被其他 dirver program 使用。
spark的缓存级别参照【org.apache.spark.storage.StorageLevel.scala】
在persist()中可以指定一个StorageLevel,当StorageLevel为MEMORY_ONLY时就是cache.
StorageLevel的列表可以在StorageLevel伴生单例对象中查看 :
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1) //hdfs的副本是存3份,RDD默认的副本数是1
extends Externalizable
new StorageLevel(_useDisk,_useMemory, _useOffHeap,_deserialized,_replication: Int = 1)
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
默认缓存级别:def persist(): this.type = persist(StorageLevel.MEMORY_ONLY),
默认情况下persist() 会把数据以反序列化的形式缓存在JVM的堆(heap)空间中。
取消缓存,执行RDD.unpersist()
1.缓存cache()的使用
object Test01 {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local"))
val rdd1 = sc.textFile("file:///D:\\IdeaProgram\\Bigdata\\file\\Spark\\wc\\input")
//在这里设置一个cache缓存,只缓存结果,建议缓存完之后执行action,再去执行转换算子+执行算子
val rdd2 = rdd1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _).cache()
//这里如果不设置缓存,每次执行完一个action以后再去执行下一个action的时候都要从头开始进行计算
//如果设置了缓存,每次执行完一个action以后再去执行下一个action的时候直接从缓存为止读取数据
rdd2.collect()
rdd2.foreach(println(_))
rdd2.take(2)
}
}
2.persist()的使用
import org.apache.spark.storage._
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("DISK_ONLY")
val sc = new SparkContext(conf);
sc.setLogLevel("ERROR")
val a = sc.parallelize(1 to 9, 3).persist(StorageLevel.DISK_ONLY)
println(a.first())
}
一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager也会stop;被缓存到磁盘上的RDD也会被清空(整个blockManager使用的local文件夹被删除)
3.checkpoint()的使用
checkpoint检查点机制
检查点(本质就是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点问题而丢失分区,从做查点的RDD开始重做lineage,就会减少开销检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能.
检查点,类似于快照,chekpoint的作⽤就是将DAG中⽐较重要的数据做⼀个检查点,将结果存储到⼀个⾼可⽤的地⽅
//检查点的使用
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")
val sc = new SparkContext(conf)
//设置检查点的路径
sc.setCheckpointDir("hdfs://hadoop01:8020/ck")
val rdd = sc.textFile("hdfs://hadoop01:8020/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//检查点的触发⼀定要使⽤个action算⼦
rdd.checkpoint()
rdd.saveAsTextFile("hdfs://hadoop01:8020/out10")
println(rdd.getCheckpointFile) //查看存储的位置
/**
查看是否可以设置检查点 rdd.isCheckpointed 这个⽅法在shell中可以使⽤ 但是代码中不好⽤
*/
}
4.实用心得:
缓存和检查点的区别
缓存把RDD计算出来的然后放在内存,但是RDD的依赖链不能丢掉,当某个exexutor宕机时,上面cache的RDD就会丢掉,需要通过依赖链放入重新计算,不同的是,checkpoint是把RDD保存在HDFS上,是多副本可靠存储,所以依赖链可以丢掉,就是斩断了依赖链,是通过复制实现的高容错
cache和persist的比较
1.cache底层调用的是persist
2.cache默认持久化等级是内存且不能修改,persist可以修改持久化的等级
什么时候使用cache或checkpoint
1.某步骤计算特别耗时
2.计算链条特别长
3.发生shuffle之后
一般情况建议使用cache或是persist模式,因为不需要创建存储位置,默认存储到内存中计算速度快,而checkpoint需要手动创建存储位置和手动删除数据,若数据量非常庞大建议使用checkpoint
Task包括ResultTask(最后一个阶段的任务) + ShuffleMapTask(非最后一个阶段)