5.3.3.1-Spark-core-RDD-API

优质
小牛编辑
140浏览
2023-12-01

1.1 RDD 的属性

  • 一个分区列表,用于并行计算
  • 一个计算每个分区数据的函数
  • 一个依赖列表,这个rdd依赖的父rdd是哪些(在计算的时候可以通过这个依赖来容错)
  • 这个rdd的分区元数据信息,其实就是该RDD怎么分区的,比如某个rdd是通过hash partitioner得到的
  • 分区数据的存储地址,用来实现计算任务的本地性
  • spark的计算是“流”式计算

    1.2 RDD 的优势

  • 高效容错
  • 可以控制数据的分区来优化计算性能
  • 并行处理
  • 提供了丰富的操作数据的 API
  • 可以显式的将任何类型的中间结果存储在内存中

    1.3 RDD 的方法

    |方法|功能| |------|------| |compute(split: Partition, context: TaskContext): Iterator[T]|| |getPartitions: Array[Partition]|| |getDependencies: Seq[Dependency[_]] = deps|| |getPreferredLocations(split: Partition): Seq[String] = Nil|| |val partitioner: Option[Partitioner] = None||

2. RDD 的创建

函数功能
sc.textFile(path)从文件系统中,比如本地文件系统或 HDFS 文件,得到行数据的 RDD。
sc.sequenceFile[KeyClass,ValueClass](path)加载 HDFS sequenceFile 文件。
sc.parallelize(Seq(1,2,3))从内存中已经存在的序列列表(Seq、List、Array)中,得到 ParallelCollectionRDD
rdd.collect rdd.glom.cloect转为序列列表
sc.range创建 ,得到 MapPartitionsRDD,collect 返回 Array
makeRDD(Seq(1,2))parallelize 别名

3. 依赖

3.1 窄依赖

父亲 RDD 的一个分区数据只能被子 RDD 的一个分区消费,子 RDD 的一个分区可以对应父 RDD 的多个分区。

  • OneToOneDependency:map、filter
  • RangeDependency:union
  • 本地性,一个父分区计算完,子分区计算。
  • 失败后计算失败分区。
  • 和父亲 RDD 的 Partitioner 相同,并且关联属性是分区键,则不发生 shuffle。

    3.2 宽依赖

    父亲 RDD 的一个分区数据被子 RDD 的多个分区消费。
  • ShuffleDependency:reduceByKey

    4. Partitioner

    给这个RDD数据进行分区的分区器。
  • 从存储系统创建的 RDD 不需要分区,HDFS 有多少数据块,就有多少分区。
  • 非 key-value 结构没必要分区。
  • key-value 结构需要分区,分区对象为 key。

    4.1 实现

  • KV 算子,作为参数传入
  • 调用 RDD partitionBy 函数
  • 调用 RDD coalesce 函数
  • 调用 RDD repartition 函数

    4.2 分区优化

  • 对 RDD 预分区能提高性能。
  • 是否保留父 RDD 的分区器,mapValue、flatMapValue 方法保留父 RDD 的分区器。

    4.3 对比

    |HashPartitioner|RangePartitioner| |------|------| ||将可以排序的 key 分到几个大概相等的范围分区内的一个分区中。| |不支持 Array 类型的 key。|不支持不能排序的 Key。| |可能导致数据倾斜。|可以解决分区数据倾斜的问题。| |分区后的数据不会排序。|分区后分区之间的数据是有序的。|

    5. 单类型 RDD 操作 API

    5.1 Transformations

    |方法|功能| |------|------| |map(func)|接收函数,将函数应用到 RDD 中到每一个元素,返回新的 RDD。| |mapPartition(func)|类似于 map,但独立地在RDD的每一个分片上运行,因此在类型为 T 的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]| |mapPartitionsWithIndex|类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]| |flatMap|对每个输入元素,可以输出多个元素。| |sample|| |filter|接收函数,返回只包含满足 filter 函数的元素的新 RDD。| |distinct|去重|

5.2 Actions

方法功能
collect返回 RDD 的所有元素。
count计数。
countByValue返回一个 Map,表示唯一元素出现的次数。
take返回几个元素。
top返回前几个元素。
takeOrdered返回基于提供的排序算法前几个元素。
takeSample(withReplacement,num,[seed])取样
reduce合并 RDD 中的元素
fold与 reduce 相似,提供 zero value,rdd.flod(0)(+)
aggregate()
foreach()遍历 rdd 中的每个元素。

6. key-value 类型 RDD 操作 API

6.1 key-value 类型 RDD 生成方法

  • 自定义
  • map 函数生成。
  • keyBy 函数生成。
  • groupBy 函数生成,在 keyBy 的基础上,将 key 相同的元素进行聚合,基于 groupByKey 实现,相当于 keyBy+groupByKey。

    6.2 KeyValue 对 RDD

    |函数|功能| |------|------| |combinerByKey|| |aggregateByKey|aggregateByKey((0,0))(mergeValue,mergeCombiner)可以实现 value 值、和词频统计功能。| |reduceByKey|createCombiner 不对数据进行任何处理,mergeValue 和 mergeCombiner 调用传入的 reduce 函数。| |distinct|基于 reduceByKey 实现,键值对都相同,则去重。reduce=((x,y)=>x)| |foldByKey(n)|createCombiner=mergeValue(n,value)| |groupByKey|createCombiner:元素转 ArrayBuffer 集合;mergeValue:将新元素添加进集合;mergeCombiner:集合合并。| |sortByKey|| |sortBy||

    MapPartitionsRDD

  • 将自定义的函数应用到父亲 RDD 一个分区到输出数据中去。
  • 有且仅有一个窄依赖,即只依赖一个父亲 RDD。
  • 分区器:可以选择是否保留父亲 RDD 的分区器。
  • 计算分区列表:继承父亲 RDD 的分区列表。

7. 二元操作 API

方法功能
union并集
intersection交集
subtract差集
方法功能
persist(StorageLevel)给 RDD 的 StorageLevel成员变量(默认为 None)赋值,存储级别:MEMORY_ONLY(默认)、DISK_ONLY、MEMORY_AND_DISK、OFF_HEAP。
cache分布式缓存,等于 cache(StorageLevel.MEMORY_ONLY)。
unpersist移除持久化数据。
ietrator获得当前 RDD 的输出。
localCheckpoint()本地磁盘文件,等于 cache(StorageLevel.MEMORY_AND_DISK)。
checkpoint()HDFS 文件系统。

参考资料