5.3.3.1-Spark-core-RDD-API
优质
小牛编辑
145浏览
2023-12-01
1.1 RDD 的属性
- 一个分区列表,用于并行计算
- 一个计算每个分区数据的函数
- 一个依赖列表,这个rdd依赖的父rdd是哪些(在计算的时候可以通过这个依赖来容错)
- 这个rdd的分区元数据信息,其实就是该RDD怎么分区的,比如某个rdd是通过hash partitioner得到的
- 分区数据的存储地址,用来实现计算任务的本地性
- spark的计算是“流”式计算
1.2 RDD 的优势
- 高效容错
- 可以控制数据的分区来优化计算性能
- 并行处理
- 提供了丰富的操作数据的 API
- 可以显式的将任何类型的中间结果存储在内存中
1.3 RDD 的方法
|方法|功能| |------|------| |compute(split: Partition, context: TaskContext): Iterator[T]|| |getPartitions: Array[Partition]|| |getDependencies: Seq[Dependency[_]] = deps|| |getPreferredLocations(split: Partition): Seq[String] = Nil|| |val partitioner: Option[Partitioner] = None||
2. RDD 的创建
函数 | 功能 |
---|---|
sc.textFile(path) | 从文件系统中,比如本地文件系统或 HDFS 文件,得到行数据的 RDD。 |
sc.sequenceFile[KeyClass,ValueClass](path) | 加载 HDFS sequenceFile 文件。 |
sc.parallelize(Seq(1,2,3)) | 从内存中已经存在的序列列表(Seq、List、Array)中,得到 ParallelCollectionRDD |
rdd.collect rdd.glom.cloect | 转为序列列表 |
sc.range | 创建 ,得到 MapPartitionsRDD,collect 返回 Array |
makeRDD(Seq(1,2)) | parallelize 别名 |
3. 依赖
3.1 窄依赖
父亲 RDD 的一个分区数据只能被子 RDD 的一个分区消费,子 RDD 的一个分区可以对应父 RDD 的多个分区。
- OneToOneDependency:map、filter
- RangeDependency:union
- 本地性,一个父分区计算完,子分区计算。
- 失败后计算失败分区。
- 和父亲 RDD 的 Partitioner 相同,并且关联属性是分区键,则不发生 shuffle。
3.2 宽依赖
父亲 RDD 的一个分区数据被子 RDD 的多个分区消费。 - ShuffleDependency:reduceByKey
4. Partitioner
给这个RDD数据进行分区的分区器。 - 从存储系统创建的 RDD 不需要分区,HDFS 有多少数据块,就有多少分区。
- 非 key-value 结构没必要分区。
- key-value 结构需要分区,分区对象为 key。
4.1 实现
- KV 算子,作为参数传入
- 调用 RDD partitionBy 函数
- 调用 RDD coalesce 函数
- 调用 RDD repartition 函数
4.2 分区优化
- 对 RDD 预分区能提高性能。
- 是否保留父 RDD 的分区器,mapValue、flatMapValue 方法保留父 RDD 的分区器。
4.3 对比
|HashPartitioner|RangePartitioner| |------|------| ||将可以排序的 key 分到几个大概相等的范围分区内的一个分区中。| |不支持 Array 类型的 key。|不支持不能排序的 Key。| |可能导致数据倾斜。|可以解决分区数据倾斜的问题。| |分区后的数据不会排序。|分区后分区之间的数据是有序的。|5. 单类型 RDD 操作 API
5.1 Transformations
|方法|功能| |------|------| |map(func)|接收函数,将函数应用到 RDD 中到每一个元素,返回新的 RDD。| |mapPartition(func)|类似于 map,但独立地在RDD的每一个分片上运行,因此在类型为 T 的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]| |mapPartitionsWithIndex|类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]| |flatMap|对每个输入元素,可以输出多个元素。| |sample|| |filter|接收函数,返回只包含满足 filter 函数的元素的新 RDD。| |distinct|去重|
5.2 Actions
方法 | 功能 |
---|---|
collect | 返回 RDD 的所有元素。 |
count | 计数。 |
countByValue | 返回一个 Map,表示唯一元素出现的次数。 |
take | 返回几个元素。 |
top | 返回前几个元素。 |
takeOrdered | 返回基于提供的排序算法前几个元素。 |
takeSample(withReplacement,num,[seed]) | 取样 |
reduce | 合并 RDD 中的元素 |
fold | 与 reduce 相似,提供 zero value,rdd.flod(0)(+) |
aggregate() | |
foreach() | 遍历 rdd 中的每个元素。 |
6. key-value 类型 RDD 操作 API
6.1 key-value 类型 RDD 生成方法
- 自定义
- map 函数生成。
- keyBy 函数生成。
- groupBy 函数生成,在 keyBy 的基础上,将 key 相同的元素进行聚合,基于 groupByKey 实现,相当于 keyBy+groupByKey。
6.2 KeyValue 对 RDD
|函数|功能| |------|------| |combinerByKey|| |aggregateByKey|aggregateByKey((0,0))(mergeValue,mergeCombiner)可以实现 value 值、和词频统计功能。| |reduceByKey|createCombiner 不对数据进行任何处理,mergeValue 和 mergeCombiner 调用传入的 reduce 函数。| |distinct|基于 reduceByKey 实现,键值对都相同,则去重。reduce=((x,y)=>x)| |foldByKey(n)|createCombiner=mergeValue(n,value)| |groupByKey|createCombiner:元素转 ArrayBuffer 集合;mergeValue:将新元素添加进集合;mergeCombiner:集合合并。| |sortByKey|| |sortBy||MapPartitionsRDD
- 将自定义的函数应用到父亲 RDD 一个分区到输出数据中去。
- 有且仅有一个窄依赖,即只依赖一个父亲 RDD。
- 分区器:可以选择是否保留父亲 RDD 的分区器。
- 计算分区列表:继承父亲 RDD 的分区列表。
7. 二元操作 API
方法 | 功能 |
---|---|
union | 并集 |
intersection | 交集 |
subtract | 差集 |
方法 | 功能 |
---|---|
persist(StorageLevel) | 给 RDD 的 StorageLevel成员变量(默认为 None)赋值,存储级别:MEMORY_ONLY(默认)、DISK_ONLY、MEMORY_AND_DISK、OFF_HEAP。 |
cache | 分布式缓存,等于 cache(StorageLevel.MEMORY_ONLY)。 |
unpersist | 移除持久化数据。 |
ietrator | 获得当前 RDD 的输出。 |
localCheckpoint() | 本地磁盘文件,等于 cache(StorageLevel.MEMORY_AND_DISK)。 |
checkpoint() | HDFS 文件系统。 |