当前位置: 首页 > 知识库问答 >
问题:

Spark 重新分区执行程序

弘承业
2023-03-14

我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。

为了避免分区内出现小块,我添加了一个重新分区(5 ),使每个分区内最多有5个文件:

df.repartition(5).write.orc("path")

我的问题是,在我分配的30个执行器中,只有5个在实际运行。最后我得到了我想要的东西(每个分区内有5个文件),但由于只有5个执行器在运行,所以执行时间非常长。

你有什么建议可以让我做得更快吗?

共有3个答案

夔庆
2023-03-14

Spark可以为RDD或数据帧的每个分区运行1个并发任务(最多可达集群中的核数)。如果您的集群有30个核心,那么应该至少有30个分区。另一方面,单个分区通常不应包含超过128MB的数据,并且单个混洗块不能大于2GB(参见SPARK-6235)。由于您希望减少执行时间,因此最好增加分区的数量,并在作业结束时减少特定作业的分区数量。为了更好地在分区之间(平均)分配数据,最好使用哈希分区器。

邹嘉荣
2023-03-14

您可以使用repartition和partitionBy来解决这个问题。有两种方法可以解决这个问题。

假设您需要按日期列进行分区

df.repartition(5, 'dateColumn').write.partitionBy('dateColumn').parquet(path)

在这种情况下,使用的执行器的数量将等于5*distinct(dateColumn),并且您的所有日期都将包含5个文件。

另一种方法是将数据重新分区3次,然后使用< code>maxRecordsPerFile保存数据。这将创建大小相等的文件,但您将无法控制创建的文件数量

df.repartition(60).write.option('maxRecordsPerFile',200000).partitionBy('dateColumn').parquet(path)
卢翔宇
2023-03-14

我用简单的方法修复了它:

df.repartition($"dateColumn").write.partitionBy("dateColumn").orc(path)

并分配与输出中的分区数量相同的执行器。

谢谢大家

 类似资料:
  • 我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html 它说: 重新分区:生成的DataFrame是哈希分区的 对于repartitionByRange:结果DataFrame是范围分区的 而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么

  • 根据Spark 1.6.3的文档,应该保留结果数据表中的分区数: 返回由给定分区表达式分区的新DataFrame,保留现有的分区数 Edit:这个问题并不涉及在Apache Spark中删除空DataFrame分区的问题(例如,如何在不产生空分区的情况下沿列重新分区),而是为什么文档所说的内容与我在示例中观察到的内容不同

  • 假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?

  • 我是Spark的新手,有一个1 TB的文件需要处理。 我的系统规格是: 每个节点:64 GB RAM 节点数:2 每个节点的核心:5 正如我所知,我必须重新分区数据以获得更好的并行性,因为火花将尝试仅通过(核心总数*2或3或4)创建默认分区。但在我的情况下,由于数据文件非常大,我必须将这些数据重新分区为一个数字,以便这些数据可以以有效的方式处理。 如何选择要在重新分区中传递的分区数??我应该如何计

  • 我使用Spark 2.1.1。 我使用结构化流从2个Kafka分区读取消息。我正在向Spark Standalone集群提交我的应用程序,其中有一个工人和两个执行者(每个2个核心)。 我想要这样的功能,来自每个Kafka分区的消息应该由每个单独的执行器独立处理。但现在正在发生的是,执行器分别读取和映射分区数据,但在映射之后,形成的无边界表被普遍使用,并且具有来自两个分区的数据。 当我对表运行结构化