当前位置: 首页 > 知识库问答 >
问题:

Spark是否在读取时维护拼花地板分区?

司徒隐水
2023-03-14

我很难找到这个问题的答案。假设我为拼花地板编写了一个数据框,并且我使用重新分区分区通过相结合来获得一个分区良好的拼花地板文件。请参阅下面:

df.repartition(col("DATE")).write.partitionBy("DATE").parquet("/path/to/parquet/file")

现在,稍后我想读取拼花文件,所以我这样做:

val df = spark.read.parquet("/path/to/parquet/file")

数据帧是否由"DATE"分区?换句话说,如果拼花地板文件被分区,火花在将其读入火花数据帧时是否会维护该分区。还是随机分区?

同样,这个答案的“为什么”和“为什么不”也会有所帮助。

共有3个答案

汤飞
2023-03-14

在您的问题中,有两种方法可以说数据正在“分区”,即:

>

  • 通过重新分区,它使用哈希分区程序将数据分发到特定数量的分区中。如果像您的问题中一样,您没有指定数字,则使用 spark.sql.shuffle.partitions 中的值,其默认值为 200。调用 .re分区通常会触发随机播放,这意味着分区现在分布在执行程序池中。

    通过 partitionBy,这是一种特定于数据帧编写器的方法,它告诉它根据键对磁盘上的数据进行分区。这意味着写入的数据被拆分为根据您的分区列命名的子目录,例如 /路径/至/镶木地板/文件/DATE=

    鉴于术语“分区”的这两种用法,在回答您的问题时有一些微妙的方面。由于您使用了< code>partitionBy并询问Spark是否“维护分区”,我怀疑您真正好奇的是Spark是否会进行分区修剪,这是一种用于显著提高在分区列上有过滤器的查询的性能的技术。如果Spark知道您寻找的值不能在特定的子目录中,它就不会浪费任何时间来读取这些文件,因此您的查询会更快地完成。

    >

  • 如果您读取数据的方式不是分区感知的,您将获得许多分区,类似于bsplosion的答案。Spark不会采用分区修剪,因此您不会获得Spark自动忽略读取某些文件以加快速度的好处1

    幸运的是,读取Spark中使用partitionBy编写的拼花文件是一种分区感知读取。即使没有像Hive这样的元存储来告诉Spark文件在磁盘上分区,Spark也会自动发现分区。请参阅Spark中的分区发现,了解它在拼花中的工作原理

    我建议测试在 spark-shell 中读取数据集,以便您可以轻松查看 .explain 的输出,这将允许您验证 Spark 是否正确找到分区,并可以修剪掉查询中不包含感兴趣数据的分区。关于这一点的一个很好的文章可以在这里找到。简而言之,如果您看到分区html" target="_blank">过滤器:[],则表示Spark没有进行任何分区修剪。但是,如果您看到类似分区过滤器的内容:[isnotnull(日期#3),(日期#3 = 2021-01-01)],Spark仅在一组特定的DATE分区中读取,因此查询执行通常要快得多。

    < sup>1一个单独的细节是< code>parquet在文件本身的列中存储有关数据的统计信息。如果这些统计数据可以用来消除与您正在进行的任何过滤都不匹配的数据块,例如< code>DATE,那么即使您读取数据的方式不是分区感知的,您也会看到一些加速。这被称为谓词下推。它之所以有效,是因为在使用< code >时,磁盘上的文件仍然只包含< code>DATE的特定值。分区依据。更多信息可以在这里找到。

  • 郑俊美
    2023-03-14

    您将根据Spark配置spark.sql.files.maxPartitionBytes得到分区的数量,默认为128MB。并且数据不会按照写入时使用的分区列进行分区。

    参考 https://spark.apache.org/docs/latest/sql-performance-tuning.html

    白光耀
    2023-03-14

    读取存储为拼花的数据时获取的分区数遵循与读取分区文本相同的许多规则:

    1. 如果火花上下文.min分区

    请注意,分区拼花文件很少具有分区的完整数据局部性,这意味着,即使数据中的分区计数与读取的分区计数相匹配,如果您试图实现分区数据局部性以提高性能,也很可能需要在内存中对数据集进行重新分区。

    鉴于您上面的用例,如果您计划在此基础上利用分区本地操作,我建议立即在“DATE”列上重新分区。上面关于分区和并行设置的警告也适用于这里。

    val df = spark.read.parquet("/path/to/parquet/file")
    df.repartition(col("DATE"))
    
     类似资料:
    • 我正在使用Spark 1.6.0。以及用于读取分区拼花数据的DataFrame API。 我想知道将使用多少个分区。 以下是我的一些数据: 2182个文件 Spark似乎使用了2182个分区,因为当我执行计数时,作业被拆分为2182个任务。 这似乎得到了的证实 对吗?在所有情况下? 如果是,数据量是否过高(即我是否应该使用df重新分区来减少数据量)?

    • 我试图利用火花分区。我试图做这样的事情 这里的问题每个分区都会创建大量的镶木地板文件,如果我尝试从根目录读取,则会导致读取缓慢。 为了避免这种情况,我试过 但是,这将创建每个分区中镶木地板文件的数目。现在我的分区大小不同了。因此,理想情况下,我希望每个分区都有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。 我应该如何使用分区来避免写入后出现许多文件?

    • 我是Spark的初学者,试图理解Spark数据帧的机制。当从csv和parquet加载数据时,我比较了spark sql dataframe上sql查询的性能。我的理解是,一旦数据加载到spark数据框中,数据的来源(csv或parquet)应该无关紧要。然而,我看到了两者之间的显著性能差异。我使用以下命令加载数据,并对其编写查询。 请解释差异的原因。

    • 我尝试从hdfs读取现有的拼花文件使用火花sql为我的POC,但击中OOM错误。 我需要读取给定分区日期的所有分区文件。分区如下:日期/file_dir_id 日期文件夹下有1200个子文件夹 拼花文件夹结构 日期: 文件\u dir\u 1 文件\u 1。拼花地板 文件2。拼花地板 文件\u 3。拼花地板 文件\u 3。拼花地板 当我尝试读取特定日期的文件时,上面提到的数字会引发ession.r

    • 我正在从Impala迁移到SparkSQL,使用以下代码读取一个表: 我如何调用上面的SparkSQL,这样它就可以返回这样的东西:

    • 关于雪花的新功能--推断模式表函数,我有一个问题。INFER模式函数在parquet文件上执行得很好,并返回正确的数据类型。但是,当parquet文件被分区并存储在S3中时,INFER模式的功能与pyspark Dataframes不同。 在DataFrames中,分区文件夹名称和值作为最后一列读取;在雪花推断模式中有没有一种方法可以达到同样的结果? 示例: 示例:{“AGMT_GID”:1714