分区器是Spark中的一个概念,Spark依据分区器定义的分区规则把数据划分到多个数据分区,每个数据分区在一个task中由一个executor进行处理。开发者可以方便地继承Partitioner接口实现自己的分区器,定义numPartitions规定RDD的分区数以及在getPartition中实现分区规则。
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
Simba扩展了Partitioner实现了自己的分区器,将数据按照取值划分到多个spark partition中。分区的划分是simba算法优化的重要部分,建立数据分区之后,simba会在分区内部建立local index,以及根据数据在各个分区的分布建立global index,从而在计算时最大程度地过滤不相关的数据。本章会介绍一种基础的空间划分算法STR和各类分区器的具体实现。
在介绍simba分区器的具体实现之前,首先会先介绍在simba各类分区器以及后面spatial join运算中应用广泛的STR算法。STR(Sort-Tile-Recursive )算法由Scott T. Leutenegger等人在其发表的《STR: A Simple and Efficient Algorithm for R-Tree Packing》论文中提出,最初是用于构建RTree索引。
不同于原始的RTree索引会伴随数据的插入逐步进行树节点的分裂(动态RTree),STR Tree适用于已经具备了全部数据,一次性建立RTree索引的场景(bulkload)。由于不会有数据的插入,所以可以根据数据的分布建立更均衡合理的RTree索引,相对于原始RTree能够具有更高的查询性能。在simba中主要应用于为数据找到合理的分区个数,以及在spatial join中将数据空间划分为多个MBR。
假设在K维空间存在N条数据,给定STR Tree单个节点的最大容量b,STR算法的主要步骤是:
1)按照MBR中心点第一维坐标对数据点进行排序,利用S=sqrt(N/b)个垂直slice切割数据空间,使每个slice包含S个节点和S*b个MBR;
2)在每个垂直slice中,按照MBR中心点第二维坐标进行排序,每b个MBR一组压入节点;
3)递归进行上述步骤,直至生成整个RTree,每个slice的MBR数据不超过b。
Simba中的Hash分区器的实现很简单,将原始数据key值的hashCode对分区数取余结果相同的数据放入相同的数据分区:
class HashPartitioner(num_partitions: Int) extends Partitioner {
override def numPartitions: Int = num_partitions
override def getPartition(key: Any): Int = {
key.hashCode() % num_partitions
}
}
分区数赋值为传入的参数,分区规则为key的hashCode对分区数取余。
Simba中的KDTree分区器的实现思路是:首先将所有数据放到一个树节点,然后递归的按照每一维的取值将节点的数据进行二分到两个新节点中,直至生成的叶子节点包含的数据条数不超过给定的KDTree的节点最大容量。步骤如下:
1)对整体RDD执行聚合算子计算最大值、最小值、空间占用、和数据总条数:
rdd.aggregate[(Bounds, Long, Int)]((null, 0, 0))((bound, data) => {
val new_bound = if (bound._1 == null) {
new Bounds(data._1.coord, data._1.coord)
} else {
new Bounds(bound._1.min.zip(data._1.coord).map(x => Math.min(x._1, x._2)),
bound._1.max.zip(data._1.coord).map(x => Math.max(x._1, x._2)))
}
(new_bound, bound._2 + SizeEstimator.estimate(data._1), bound._3 + 1)
}
2)对数据进行取样:
val seed = System.currentTimeMillis()
val sampled = if (total_size * sample_rate <= transfer_threshold){
rdd.sample(withReplacement = false, sample_rate, seed).map(_._1).collect()
}
else {
rdd.sample(withReplacement = true, transfer_threshold / total_size, seed).map(_._1).collect()
}
3)按照上述思路,依据取样数据和数据分布边界递归地从根节点开始将整体数据划分到多个叶子节点中,并对每个叶子节点的数据生成一个MBR,从而将整体取值空间划分为多个MBR:
def recursiveGroupPoint(entries: Array[Point], low_bound: Seq[Double],
high_bound: Seq[Double], cur_dim: Int): Array[MBR] = {
var ans = mutable.ArrayBuffer[MBR]()
val grouped = entries.sortWith((a, b) =>
a.coord(cur_dim) < b.coord(cur_dim)).grouped(Math.ceil(entries.length / 2.0).toInt).toArray
val center = grouped(1).head.coord
require(grouped.length == 2)
val new_high = 0 until dimension map {i =>
if (i != cur_dim) high_bound(i)
else center(i)
}
val new_low = 0 until dimension map { i =>
if (i != cur_dim) low_bound(i)
else center(i)
}
if (grouped(0).length >= max_entries_per_node){
ans ++= recursiveGroupPoint(grouped(0), low_bound,
new_high, (cur_dim + 1) % dimension)
} else {
ans += new MBR(new Point(low_bound.toArray.clone()),
new Point(new_high.toArray.clone()))
}
if (grouped(1).length >= max_entries_per_node){
ans ++= recursiveGroupPoint(grouped(1), new_low,
high_bound, (cur_dim + 1) % dimension)
} else {
ans += new MBR(new Point(new_low.toArray.clone()),
new Point(high_bound.toArray.clone()))
}
ans.toArray
}
val mbrs = recursiveGroupPoint(sampled, data_bounds.min, data_bounds.max, 0)
这部分代码是递归对整体数据代码递归建立MBR划分的核心代码:首先按照当前维的取值进行排序并分为两部分(二分),然后判断两个部分的数据是否大于给定的节点最大容量max_entries_per_node,如果不大于就把这个节点的全部数据生成一个MBR放入数组;如果大于就对于这个部分的数据按照下一维的取值继续进行划分,将划分的结果汇总到数组中。
4)以最终生成的数组中MBR个数作为partition个数,利用划分好的MBR块构建RTree,分区规则为:为当前点找到最近的MBR;
val rt = RTree(mbrBound.map(x => (x._1, x._2, 1)), 25) // the default value is fine
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[Point]
rt.circleRange(k, 0.0).head._2
}
MapD分区器的逻辑也很简单,接受入参作为分区数,将key值转化为Int作为分区ID(partitionID)
class MapDPartitioner(num_partitions: Int) extends Partitioner {
def numPartitions: Int = num_partitions
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[Int]
require(k >= 0 && k < num_partitions)
k
}
}
RangeD分区器的逻辑是:接受传入的range_bounds,对给定key值进行二分查找key值恰好小于哪个rangeBound的上边界,从而确定属于哪个分区
class RangeDPartitioner[K: Ordering: ClassTag](range_bounds: Array[K],
ascending: Boolean) extends Partitioner {
def numPartitions: Int = range_bounds.length + 1
private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (range_bounds.length < 128) {
while (partition < range_bounds.length && Ordering[K].gt(k, range_bounds(partition)))
partition += 1
} else {
partition = binarySearch(range_bounds, k)
if (partition < 0) partition = -partition - 1
if (partition > range_bounds.length) partition = range_bounds.length
}
if (ascending) partition
else range_bounds.length - partition
}
}
Range也是根据一维数据的取值将数据分段,进而根据处于哪个数据分段确定分区ID。分区器的分区过程可以分为以下四个部分:
1)使用reservoir Sample(水库抽样)方法对每个原始数据分区进行分别抽样
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
2)对数据量大(大于sampleSizePerPartition)的分区进行重新抽样
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
每个分区的记录数*fraction如果大于该partition中设定的样本数(这是由于不同的分区中包含的数据量不同,数据量较大的分区中抽样数将会大于平均值),则用imbalancedPartitions存储,并重新抽样以确保每个分区中都有足够数量的样本。最后计算权重——分区记录总数/分区样本数。
3)由取样信息计算出合理的每个分区的最大值;
def determineBounds[K : Ordering : ClassTag](candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight > target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
在上一步计算出了每种的取值和权重,这一步会从第一种取值开始累加每种取值的权重,如果累加的权重和大于了特定的target+step,则把当前取值作为一个分区的分隔符存入bounds中,直至生成所有分区的bound。
4)由rangeBounds计算分区数和key的所属分区
采用二分查找的方式根据当前key值和rangeBounds计算partitionID
STR分区器的算法流程与KDTree分区器的四个步骤基本是一致的,除了第三步递归划分MBR的部分略有区别(主要区别在于每层进行节点分裂时不进行二分,而是排序分组,分裂成多个节点):
def recursiveGroupPoint(entries: Array[Point], now_min: Array[Double],
now_max: Array[Double], cur_dim: Int, until_dim: Int): Array[MBR] = {
val len = entries.length
val grouped = entries.sortWith(_.coord(cur_dim) < _.coord(cur_dim))
.grouped(Math.ceil(len * 1.0 / dim(cur_dim)).toInt).toArray
var ans = mutable.ArrayBuffer[MBR]()
if (cur_dim < until_dim) {
for (i <- grouped.indices) {
val cur_min = now_min
val cur_max = now_max
if (i == 0 && i == grouped.length - 1) {
cur_min(cur_dim) = data_bounds.min(cur_dim)
cur_max(cur_dim) = data_bounds.max(cur_dim)
} else if (i == 0) {
cur_min(cur_dim) = data_bounds.min(cur_dim)
cur_max(cur_dim) = grouped(i + 1).head.coord(cur_dim)
} else if (i == grouped.length - 1) {
cur_min(cur_dim) = grouped(i).head.coord(cur_dim)
cur_max(cur_dim) = data_bounds.max(cur_dim)
} else {
cur_min(cur_dim) = grouped(i).head.coord(cur_dim)
cur_max(cur_dim) = grouped(i + 1).head.coord(cur_dim)
}
ans ++= recursiveGroupPoint(grouped(i), cur_min, cur_max, cur_dim + 1, until_dim)
}
ans.toArray
} else {
for (i <- grouped.indices) {
if (i == 0 && i == grouped.length - 1) {
now_min(cur_dim) = data_bounds.min(cur_dim)
now_max(cur_dim) = data_bounds.max(cur_dim)
} else if (i == 0) {
now_min(cur_dim) = data_bounds.min(cur_dim)
now_max(cur_dim) = grouped(i + 1).head.coord(cur_dim)
} else if (i == grouped.length - 1) {
now_min(cur_dim) = grouped(i).head.coord(cur_dim)
now_max(cur_dim) = data_bounds.max(cur_dim)
} else {
now_min(cur_dim) = grouped(i).head.coord(cur_dim)
now_max(cur_dim) = grouped(i + 1).head.coord(cur_dim)
}
ans += MBR(new Point(now_min.clone()), new Point(now_max.clone()))
}
ans.toArray
}
}
递归地按照每一维的取值对整体数据进行分组:当前维的取值进行排序并分为每b条数据分为一组(b的计算参照STR算法介绍部分),然后对当前维的每个分组继续按照下一维进行划分。
Voronoi分区器保存了一个key值到partitionID的映射数组,partition数目通过参数传入。
class VoronoiPartitioner(pivot_to_group: Array[Int], num_group: Int) extends Partitioner {
override def numPartitions: Int = num_group
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[Int]
pivot_to_group(k)
}
}
QuadTree的算法步骤同样是与KDTree相比,只有第三步划分取值空间为多组MBR有区别(主要区别在于每层节点进行分裂时,进行四分):
val tmp_qtree = QuadTree(sampled.zipWithIndex,
(data_bounds.min.head, data_bounds.min(1), data_bounds.max.head, data_bounds.max(1)))
def searchMBROnQuadTree(node: QuadTreeNode): Array[(MBR, Int)] = {
val ans = mutable.ArrayBuffer[(MBR, Int)]()
if (node.children == null){
val mbr = new MBR(Point(Array(node.x_low, node.y_low)),
Point(Array(node.x_high, node.y_high)))
ans += (mbr -> count)
node.objects = Array((mbr.centroid.coord(0), mbr.centroid.coord(1), count))
count += 1
} else for (child <- node.children) ans ++= searchMBROnQuadTree(child)
ans.toArray
}
val mbrs = searchMBROnQuadTree(tmp_qtree.root)
首先根据数据的取值范围建立QuadTree(建立QuadTree的代码在索引部分,所以这部分逻辑会在索引部分详细介绍),然后会递归地先序遍历生成的QuadTree,将QuadTree的所有叶子节点对应的MBR放入数组。