了解Spark的缓存和检查点机制对提高作业的计算速度和可靠性具有非常大的帮助。
persist可以将数据(RDD)缓存到内存或持久化到磁盘的方法。虽然是惰性计算,但严格来说,persist既不是转换算子,也不是行动算子,只是标记了当前RDD要进行缓存。
persist(storageLevel=StorageLevel(False, True, False, False, 1))
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)
看一下StorageLevel存储级别有多少种类型:
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)
咋一看有11种,其实简单分析一下,就可以简单规约为3类:只用内存、用内存和磁盘、只用磁盘
1、只用内存:MEMORY_ONLY
2、用内存和磁盘:MEMORY_AND_DISK,优先用内存,内存不够时,再用磁盘
3、只用磁盘:DISK_ONLY
另外就是后缀可带_SER和_2,_SER表示会将RDD中的数据进行序列化,RDD的每个分区会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC;_2表示会缓存两份,在计算需要高可靠性时可以使用。
cache是persist的特殊用法,即参数为MEMORY_ONLY。
Spark中的checkpoint,也就是把比较重要的中间数据(RDD)存储到一个高可用的地方(一般就是HDFS)。
好处:如果中间数据丢失了,Spark会根据依赖关系从头再算一遍,非常浪费资源,有了checkpoint后,可以直接从checkpoint里读取。
简单使用例子:
rdd = sc.parallelize(range(10))
sc.setCheckpointDir('gld')
rdd.cache()
rdd.checkpoint()
rdd.collect()
$ hdfs dfs -ls gld/68e317f6-8620-40f2-a515-60ab7f4be92f/rdd-1
Found 2 items
-rw-r--r-- 3 hive hive 42 2021-09-23 14:58 gld/68e317f6-8620-40f2-a515-60ab7f4be92f/rdd-1/part-00000
-rw-r--r-- 3 hive hive 42 2021-09-23 14:58 gld/68e317f6-8620-40f2-a515-60ab7f4be92f/rdd-1/part-00001
注意点:
1、当checkpoint执行成功了,RDD前面所有的依赖都会被销毁。
2、checkpoint也是惰性计算,遇到行动算子才会执行。
3、一般checkpoint之前加一个cache,否则checkpoint计算一次,遇到行动算子又会执行一次。