我试图利用火花分区。我试图做这样的事情
data.write.partitionBy("key").parquet("/location")
这里的问题每个分区都会创建大量的镶木地板文件,如果我尝试从根目录读取,则会导致读取缓慢。
为了避免这种情况,我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
但是,这将创建每个分区中镶木地板文件的数目。现在我的分区大小不同了。因此,理想情况下,我希望每个分区都有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。
我应该如何使用分区来避免写入后出现许多文件?
这对我很有效:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
它在每个输出分区(目录)中生成N个文件,并且(轶事)比使用coalesce
更快,并且(同样,轶事,在我的数据集上)比仅在输出上重新分区更快。
如果您正在使用S3,我还建议在本地驱动器上执行所有操作(Spark在写出期间会执行大量的文件创建/重命名/删除操作),一旦一切就绪,就使用hadoop FileUtil
(或仅aws cli)来复制所有内容:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
编辑:根据评论中的讨论:
您有一个分区列为YEAR的数据集,但是每一年的数据量都有很大的不同。因此,一年可能有1GB的数据,但另一年可能有100GB。
以下是处理此问题的一种方法的伪代码:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
但是,我实际上不知道这将起什么作用。Spark可能会在读取每个列分区的可变数量的文件时遇到问题。
另一种方法是编写自己的自定义分区器,但我不知道其中涉及到什么,所以我无法提供任何代码。
让我们用一种额外的方法来扩展Raphael Roth的答案,该方法将创建每个分区可以包含的文件数量的上限,如本答案中所述:
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
首先,我会真正避免使用<code>合并</code>,因为这通常会在转换链中进一步推进,可能会破坏工作的并行性(我在这里问到这个问题:合并减少了整个阶段的并行性)
每个拼花分区写入 1 个文件非常简单(请参阅写入许多小文件的 Spark 数据帧写入方法):
data.repartition($"key").write.partitionBy("key").parquet("/location")
如果你想设置任意数量的文件(或大小相同的文件),你需要使用另一个可以使用的属性进一步重新分区你的数据(我无法告诉你在你的情况下可能是什么):
data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")
another_key
可以是数据集的另一个属性,也可以是对现有属性使用一些模或舍入操作的派生属性。您甚至可以使用窗口函数,将row_number
放在key
上,然后将其舍入如下内容
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")
这会将您的N
记录放入1个拼花文件中
使用orderBy
您还可以通过对数据帧进行相应的排序来控制文件的数量,而无需重新分区:
data.orderBy($"key").write.partitionBy("key").parquet("/location")
这将导致所有分区上总共(至少,但不会超过)< code > spark . SQL . shuffle . partitions 个文件(默认为200个)。在< code>$key之后添加第二个排序列甚至是有益的,因为parquet会记住数据帧的排序,并相应地写入统计数据。例如,您可以按ID订购:
data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")
这不会更改文件的数量,但当您查询 parquet 文件以获取给定的键
和 ID
时,它将提高性能。例如,请参阅 https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide 和 https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example
火花 2.2
从 Spark 2.2 开始,您还可以使用新选项 maxRecordsPerFile
来限制每个文件的记录数(如果文件太大)。如果您有 N 个分区,您仍将获得至少 N 个文件,但您可以将由 1 个分区(任务)写入的文件拆分为更小的块:
df.write
.option("maxRecordsPerFile", 10000)
...
例如,请参阅 http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/ 和 spark 写入 N 个小于 N 个分区的磁盘
我是Spark的初学者,试图理解Spark数据帧的机制。当从csv和parquet加载数据时,我比较了spark sql dataframe上sql查询的性能。我的理解是,一旦数据加载到spark数据框中,数据的来源(csv或parquet)应该无关紧要。然而,我看到了两者之间的显著性能差异。我使用以下命令加载数据,并对其编写查询。 请解释差异的原因。
我很难找到这个问题的答案。假设我为拼花地板编写了一个数据框,并且我使用与相结合来获得一个分区良好的拼花地板文件。请参阅下面: 现在,稍后我想读取拼花文件,所以我这样做: 数据帧是否由分区?换句话说,如果拼花地板文件被分区,火花在将其读入火花数据帧时是否会维护该分区。还是随机分区? 同样,这个答案的“为什么”和“为什么不”也会有所帮助。
由于,我检查了一个spark作业的输出拼花文件,该作业总是会发出声音。我在Cloudera 5.13.1上使用了 我注意到拼花地板排的大小是不均匀的。第一排和最后一排的人很多。剩下的真的很小。。。 拼花地板工具的缩短输出,: 这是已知的臭虫吗?如何在Spark中设置拼花地板块大小(行组大小)? 编辑: Spark应用程序的作用是:它读取一个大的AVRO文件,然后通过两个分区键(使用
我已经使用Spark生成了一些分区拼花地板数据,我想知道如何将其映射到Impala表。。。遗憾的是,我还没有找到任何解决办法。 拼花地板的架构如下: 我用和对其进行了分区,这为我的hdfs提供了这种目录: 您知道我如何告诉Impala从这个数据集创建一个具有相应分区的表(并且不必像我读到的那样在每个分区上循环)?有可能吗? 提前谢谢你
我正在使用Spark 1.6.0。以及用于读取分区拼花数据的DataFrame API。 我想知道将使用多少个分区。 以下是我的一些数据: 2182个文件 Spark似乎使用了2182个分区,因为当我执行计数时,作业被拆分为2182个任务。 这似乎得到了的证实 对吗?在所有情况下? 如果是,数据量是否过高(即我是否应该使用df重新分区来减少数据量)?
我尝试从hdfs读取现有的拼花文件使用火花sql为我的POC,但击中OOM错误。 我需要读取给定分区日期的所有分区文件。分区如下:日期/file_dir_id 日期文件夹下有1200个子文件夹 拼花文件夹结构 日期: 文件\u dir\u 1 文件\u 1。拼花地板 文件2。拼花地板 文件\u 3。拼花地板 文件\u 3。拼花地板 当我尝试读取特定日期的文件时,上面提到的数字会引发ession.r