当前位置: 首页 > 知识库问答 >
问题:

使用pyspark对parquet文件进行分区和重新分区

壤驷坚
2023-03-14

步骤3我通过for循环加载每个模块分区,执行聚合,并以追加模式将其保存为文件夹,这样我就有9个模块作为文件夹:S3://path/module a/S3://path/module b等。它们不按模块分区,只是保存为文件夹。由于我的默认spark numpartitions是201,每个模块文件夹都有201文件,因此总共有9*201=1809文件

步骤4到目前为止还不错,但是我需要按date把它分区回来。因此,我循环遍历每个模块分区,并将文件保存为一个没有任何分区的parquet文件。这导致总共有2751文件。我不知道这是怎么算出来的。

步骤5然后加载整个未分区的文件,并按date进行分区保存。这导致了大约39k文件,每个文件约为1.5MB。因此,我有大量的小文件,加载parquet或对它们执行任何操作,如groupby等,都需要很长的时间。

在做了更多的阅读之后,我尝试在步骤4中使用repartition(1).partitionby('date')来减少文件的数量,但是在接近尾声时失败了。从第四步开始我就知道我在做错事。有没有更有效的方法来做整件事?

谢谢。

共有1个答案

闻人宝
2023-03-14

找到正确的分区数量是你关心的。

假设您有86天的数据,并且希望按日期分区保存它。然后您应该知道要在一个分区下创建多少个文件。

假设每个日期有3 GB数据,那么每个日期文件夹中可能至少需要6个文件。

df.repartition(6,'date').write.partitionBy('date')...
df.repartition(6, 'date').write.option("maxRecordsPerFile", 10000).partitionBy('date')...
 类似资料:
  • 既然这样,为什么还会有人使用重新分区呢?我想我唯一能看到它被使用的时候是如果我没有使用PairRDD,或者我有很大的数据偏差? 我是否遗漏了什么,或者有人能从不同的角度为我照亮吗?

  • 我有数千个压缩文件,每个压缩文件的大小为 2GB,位于 HDFS 中。我正在使用火花来处理这些文件。我正在使用Spark textFile()方法从HDFS加载文件。我的问题是如何重新分区数据,以便我可以并行处理每个文件。目前,每个.gz文件都在单个任务中处理。因此,如果我处理 1000 个文件,则只执行 1000 个任务。我知道,压缩文件是不可拆分的。但是,我可以使用其他方法来更快地运行我的作业

  • 我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html 它说: 重新分区:生成的DataFrame是哈希分区的 对于repartitionByRange:结果DataFrame是范围分区的 而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么

  • 我是Spark的新手,有一个1 TB的文件需要处理。 我的系统规格是: 每个节点:64 GB RAM 节点数:2 每个节点的核心:5 正如我所知,我必须重新分区数据以获得更好的并行性,因为火花将尝试仅通过(核心总数*2或3或4)创建默认分区。但在我的情况下,由于数据文件非常大,我必须将这些数据重新分区为一个数字,以便这些数据可以以有效的方式处理。 如何选择要在重新分区中传递的分区数??我应该如何计

  • 我有一个具有如下模式的dataframe:

  • 问题内容: 如何在Java 8 Stream上实现“分区”操作?划分是指将流分成给定大小的子流。它在某种程度上与Guava Iterators.partition()方法相同,只是希望分区是延迟评估的Streams,而不是List的Streams。 问题答案: 将任意源流划分为固定大小的批次是不可能的,因为这会加重并行处理。并行处理时,你可能不知道拆分后的第一个子任务中有多少个元素,因此你无法为下