大空间数据变得无处不在。因此,为基于位置的服务(LBS)中的众多应用提供快速、可扩展和高吞吐量的空间查询和分析是至关重要的。
传统的空间数据库和空间分析系统基于磁盘,并针对IO效率进行了优化。但越来越多的数据存储和处理在内存中以实现低延迟,CPU时间成为新的瓶颈。我们介绍了Simba(空间内存大数据分析)系统,该系统为大空间数据提供可扩展和高效的内存空间查询处理和分析。Simba基于Spark,运行在一组商品机器上。特别是,Simba扩展了Spark SQL引擎,通过SQL和DataFrame API支持丰富的空间查询和分析。它在RDD上引入索引,以便处理大空间数据和复杂的空间操作。最后,Simba实现了一个有效的查询优化器,它利用其索引和新颖的空间感知优化来实现低延迟和高吞吐量。在大型数据集上的大量实验表明,与其他空间分析系统相比,Simba的性能更优越。
近年来,空间数据量呈爆炸式增长。智能手机上的移动应用程序和各种物联网(IoT)项目(例如,智能城市的传感器测量)产生大量空间维度的数据。此外,空间维度通常在这些应用程序中扮演重要角色,例如,用户和司机位置是优步应用程序最关键的功能。如何以低延迟和高吞吐量查询和分析如此大的空间数据是一个基本挑战。大多数传统和现有的空间数据库和空间分析系统都是面向磁盘的(例如,Oracle spatial、SpatialHadoop和Hadoop GIS[11,22])。由于它们已针对IO效率进行了优化,因此当扩展到大型空间数据时,它们的性能往往会恶化。
当今实现低延迟和高吞吐量的一个流行选择是在商品机器集群上使用内存计算。Spark[38]等系统通过使用分布式内存存储和计算提供低查询延迟和高分析吞吐量,在大数据处理方面取得了巨大成功。最近,Spark SQL[13]使用类似SQL的查询接口和DataFrame API扩展了Spark,以对不同的底层数据源(例如,来自DFS的数据)进行关系处理。这样的扩展提供了有用的抽象,以支持分布式内存空间上的简单和用户友好的大数据分析。此外,SQL的声明性特性还为查询优化提供了丰富的机会,同时极大地简化了最终用户的工作。
然而,现有的分布式内存查询和分析引擎,如Spark、Spark SQL和MemSQL,都没有为空间查询和分析提供本地支持。为了使用这些系统处理大型空间数据,必须依赖UDF或用户程序。由于UDF(或用户程序)位于查询引擎内核之外,因此底层系统无法优化工作负载,这通常会导致非常昂贵的查询评估计划。例如,当Spark SQL通过UDF实现空间距离连接时,它必须使用昂贵的笛卡尔乘积方法,这对于大数据来说是不可扩展的。
受这些观察的启发,我们设计并实现了Simba(空间内存大数据分析)系统,这是一个分布式内存分析引擎,以支持对大空间数据的空间查询和分析,主要目标如下:简单而富有表现力的编程界面、低查询延迟、高分析吞吐量和出色的可扩展性。特别是,辛巴有以下独特的特点:
Simba通过一类重要的空间操作扩展了Spark SQL,并在SQL和DataFrame API中为它们提供了简单而富有表现力的编程接口。
Simba支持RDD(弹性分布式数据集)上的(空间)索引,以实现低查询延迟。
Simba设计了一个SQL上下文模块,它并行执行多个空间查询,以提高分析吞吐量。
Simba将空间感知优化引入逻辑和物理优化器,并使用基于成本的优化(CBO)来选择好的空间查询计划。
由于Simba基于Spark,它继承并扩展了Spark的容错机制。与依赖UDF来支持空间查询和分析的Spark SQL不同,**Simba通过其索引支持、查询优化器和查询评估器来支持此类操作。**由于这些模块是针对空间操作而定制的,Simba在对大型空间数据进行空间查询和分析时实现了出色的可扩展性。
本文的其余部分组织如下。我们在第2节中介绍了必要的背景知识,并在第3节中提供了Simba的系统概述。第4节介绍了Simba的编程接口,第5节讨论了其索引支持。第6节描述了Simba中的空间操作,而第7节解释了Simba的查询优化器和容错机制。第8节给出了大型真实数据集的大量实验结果。第9节总结了除第2节讨论的工作外的相关工作,第10节总结了本文。
ApacheSpark[38]是一个通用的、广泛使用的用于大数据处理的集群计算引擎,具有Scala、Java和Python中的API。自成立以来,基于Spark的内存大数据分析丰富生态系统已经建立,包括流媒体、图形处理和机器学习库[6]。
Spark为内存集群计算提供了一种高效的抽象,称为弹性分布式数据集(RDD)。每个RDD都是跨集群分区的Java或Python对象的分布式集合。用户可以通过Spark提供的函数式编程API(如map、filter、reduce)操纵RDD,这些API以编程语言获取函数,并将其发送到集群上的其他节点。例如,我们可以使用以下标量代码计算文本文件中包含“ERROR”的行数:
lines = spark.textFile("hdfs://...")
errors = lines.filter(l => l.contains("ERROR"))
println(errors.count())
此示例通过读取HDFS文件创建一个名为行的字符串RDD,并使用筛选器操作获取另一个RDD错误,该错误由仅包含“ERROR”的行组成。最后,对输出的错误进行计数。
RDD是容错的,因为Spark可以通过重新运行操作来重建丢失的分区,从而使用沿袭图恢复丢失的数据。
RDD也可以缓存在内存中或显式地持久化在磁盘上,以加速数据重用和支持迭代[38]。RDD被延迟评估。每个RDD实际上代表了一个计算数据集的“逻辑计划”,它由原始输入RDD上的一个或多个“转换”组成,而不是物理的物化数据本身。Spark将等待某些输出操作(称为“操作”)(如collect)启动计算。这允许引擎进行一些简单的优化,例如流水线操作。回到上面的示例,Spark将通过应用过滤器和计数记录从HDFS文件中流水线读取行。由于这个特性,Spark永远不需要实现中间行和错误结果。虽然这种优化非常有用,但它也很有限,因为引擎不理解RDD中的数据结构(可以是任意Java/Python对象)或用户函数的语义(可能包含任意代码和逻辑)。
Simba基于Spark SQL[13],专门针对多维数据集上的大规模空间查询和分析进行了优化。Simba继承并扩展了SQL和DataFrame API,因此用户可以方便地指定不同的空间查询和分析来与底层数据交互。这个过程中的一个主要挑战是扩展SQL和DataFrame API,以支持Simba内核内的丰富空间操作。
图1显示了Simba的整体架构。Simba遵循与Spark SQL类似的体系结构,但在整个系统堆栈中引入了新的特性和组件。特别是,图1中的橙色框突出显示了与Spark SQL不同的新模块。与Spark SQL类似,Simba允许用户通过命令行(CLI)、JDBC和scala/python程序与系统交互。它可以连接到各种各样的数据源,包括来自HDFS(Hadoop分布式文件系统)、关系数据库、Hive和本地RDD的数据源。
Simba中的一个重要设计选择是不使用核心spark引擎,只对spark SQL的内核进行更改。这一选择使得一些实现更具挑战性(例如,在不修改Spark内核的情况下添加对空间索引的支持),但它允许将Simba轻松迁移到将来发布的Spark新版本中。
indexing
空间查询的处理成本很高,尤其是对于多维空间中的数据和空间连接和kNN等复杂操作。为了获得更好的查询性能,Simba在其内核中引入了索引的概念。特别是,Simba在Spark中的RDD上实现了几个经典的索引结构,包括哈希映射、树映射和R-树[14,23]。Simba采用两级索引策略,即本地索引和全局索引。
全局索引从每个RDD分区收集统计信息,并帮助系统修剪不相关的分区。在每个RDD分区内,构建本地索引以加速本地查询处理,从而避免扫描整个分区。在Simba中,用户可以通过索引管理命令随时在任何表上建立和删除索引。通过构建一个名为IndexRDD的新抽象,该抽象扩展了Spark中的标准RDD结构,索引可以持久化到磁盘,并可以轻松地与相关数据一起加载回内存。我们将在第5节中描述Simba的索引支持。
spatial operation
Simba支持对点和矩形对象进行许多流行的空间操作。这些空间操作是基于本地Spark RDD API实现的。为每个操作提供了多个访问和评估路径,因此最终用户和Simba的查询优化器可以自由选择最合适的方法。第6节讨论了Simba如何支持各种空间操作。
新颖性和贡献。据我们所知,Simba是第一个完整的(即,通过复杂的查询引擎和查询优化器支持SQL和DataFrame)内存空间查询和分析引擎,支持机器集群。尽管我们的架构基于Spark SQL,但在内存、分布式和并行环境中实现高效和可扩展的空间查询解析、空间索引、空间查询算法和空间感知查询引擎仍然是非常重要的,并且需要大量的设计和实现工作,因为Spark SQL是针对关系查询处理而定制的。总之,
•我们提出了一种系统架构,该架构可调整Spark SQL以支持丰富的空间查询和分析。
•我们在Spark中设计了两级索引框架和一个新的RDD抽象,以在引擎内部本地构建RDD上的空间索引。
•我们在分布式和并行环境中,在RDD抽象所带来的约束下,为高效和可扩展地执行空间运算符提供了新的算法。
•利用空间索引支持,我们在空间感知查询优化器中引入了新的逻辑和基于成本的优化;由于缺少对空间索引的支持,许多这样的优化在Spark SQL中是不可能的。我们还针对特定的空间操作(如kNN连接)利用分区调整和查询优化。
索引对于空间查询和分析的有效处理非常重要,尤其是对于多维数据和复杂的空间操作(如kNN和空间连接)。特别是,索引是Simba中构建有效优化器的关键组件。由于Simba是内存分析引擎,因此减少磁盘IO不是索引的主要重点。相反,Simba利用索引来减少查询延迟,并通过降低CPU成本来提高查询吞吐量。例如,索引可以帮助Simba在处理范围查询时删除不相关的RDD分区,从而为基础Spark引擎释放更多的CPU资源,从而提高查询吞吐量。
Simba直接在RDD上构建(空间)索引,以加快查询处理速度。具体而言,表表示为记录的RDD(即RDD[Row])。因此,表的索引记录成为RDD中的索引元素。然而,RDD是为顺序扫描而设计的,因此随机访问非常昂贵,因为它可能成为RDD上的完整扫描。一个额外的复杂性是,出于第3节中解释的原因,我们希望在不更改Spark内核的情况下引入索引支持。为了克服这些挑战,我们通过引入称为IndexRDD[Row]的新抽象来改变索引表的存储格式,并采用两级索引策略,该策略可以适应各种索引结构以支持Simba中的不同查询。
索引RDD。回想一下,表中的记录存储为Row对象(Row对象),每个表存储为包含多个分区的Row的RDD。为了在表上添加索引支持,我们将RDD分区内的所有记录(即Row对象)打包到一个数组中,这样每个记录都有一个唯一的下标作为其索引。这种改变使得RDD分区内的随机访问成为一种具有O(1)成本的有效操作。为了在Simba中实现这一点,我们介绍了如下IPartition数据结构: