当前位置: 首页 > 知识库问答 >
问题:

Coalesce可以增加Spark DataFrame的分区吗

苍恩
2023-03-14

我试图理解coalesce()re分区()之间的区别。

如果我正确理解了这个答案,coalesce()只能减少dataframe的分区数量,如果我们尝试增加分区数量,那么分区数量将保持不变。

但当我试图执行下面的代码时,我发现了两件事

  1. 对于合并的Dataframe,可以增加分区数
  2. 对于Rdd,如果Shuffle=false,那么分区的数量不能随着合并而增加。

这是否意味着使用合并数据帧分区可以增加?

当我执行以下代码时:

val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
println("Original dataframe partitions = " + h1b1Df.rdd.getNumPartitions)

val coalescedDf = h1b1Df.coalesce(2)
println("Coalesced dataframe partitions = " + coalescedDf.rdd.getNumPartitions

val coalescedDf1 = coalescedDf.coalesce(6) 
println("Coalesced dataframe with increased partitions = " + coalescedDf1.rdd.getNumPartitions) 

我得到以下输出

Original dataframe partitions =  8
Coalesced dataframe partitions = 2
Coalesced dataframe with increased partitions = 6

当我执行以下代码时:

val inpRdd = h1b1Df.rdd
println("Original rdd partitions = " + inpRdd.getNumPartitions)

val coalescedRdd = inpRdd.coalesce(4)
println("Coalesced rdd partitions = " + coalescedRdd.getNumPartitions)

val coalescedRdd1 = coalescedRdd.coalesce(6, false)
println("Coalesced rdd with increased partitions = " + coalescedRdd1.getNumPartitions)

我得到以下输出:

Original rdd partitions =  8
Coalesced rdd partitions = 4
Coalesced rdd with increased partitions = 4

共有3个答案

龙承德
2023-03-14

如果您在同一个数据帧上应用了几个合并,而没有在这些合并之间执行任何转换,Spark将通过仅应用最新的合并来优化您的合并,给您的印象是您可以使用合并增加分区数量

如果我们使用以下代码片段:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .master("local[8]") // use 8 threads
    .appName("test-app")
    .getOrCreate()

import spark.implicits._

val input = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).toDF("MyCol")
val simpleCoalesce = input.coalesce(4)
val doubleCoalesce = simpleCoalesce.coalesce(8)

println(doubleCoalesce.rdd.getNumPartitions)

它将打印8。为了更好地理解我们为什么会得到这个结果,我们在双重合并上执行解释(true)

doubleCoalesce.explain(true)

我们通过以下步骤来构建最终的spark执行计划,以解决doubleCoalesce问题:

== Parsed Logical Plan ==
Repartition 8, false
+- Repartition 4, false
   +- Project [value#1 AS MyCol#4]
      +- LocalRelation [value#1]

...

== Physical Plan ==
Coalesce 8
+- LocalTableScan [MyCol#4]

我们可以看到,在解析的逻辑计划(对代码片段的简单解析)和物理计划(将应用于生成最终数据帧的计划)之间,只保留了最新的合并,因此实际上从未应用coalesce(4)转换。因此,只有coalesce(8)被应用,我们得到一个有8个分区的数据帧。

如果我们想要应用合并(4),我们需要在两个合并之间执行转换,例如复杂的选择

import org.apache.spark.sql.functions.col

val separatedCoalesce = simpleCoalesce
  .select((col("MyCol") + 0).as("MyCol"))
  .coalesce(8)

println(separatedCoalesce.rdd.getNumPartitions)

注意:简单选择为。选择(col(“MyCol”)将不起作用,因为spark将在优化阶段放弃选择

此代码打印4。如果我们看一下分离合并的物理计划:

== Physical Plan ==
Coalesce 8
+- *(1) Project [(MyCol#4 + 0) AS MyCol#9]
   +- Coalesce 4
      +- LocalTableScan [MyCol#4]

我们可以看到,这里应用了coalesce(4),因此最后我们得到了一个只有4个分区的数据帧,尽管我们应用了coalesce(8)

Spark优化可能很棘手,会让你觉得有些事情没有发生。所以请记住,spark并不是完全执行编写的代码,而是它的优化版本。

侯涵煦
2023-03-14

Coalesce for dataframe增加的分区数不能超过群集中的核心总数。

 val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
 h1b1Df.rdd.getNumPartitions        // prints 8

 val cloasedDf = h1b1Df.coalesce(21)  
 cloasedDf.rdd.getNumPartitions     // prints 8

 val cloasedDf1 = cloasedDf.coalesce(2) // prints 2
 cloasedDf1.rdd.getNumPartitions

 val cloasedDf2 = cloasedDf.coalesce(7) // prints 7
 cloasedDf2.rdd.getNumPartitions
拓拔元徽
2023-03-14

Coalesce可以通过设置shuffle=true来增加分区,这等于重新分区。当使用coalesce with shuffle=false来增加时,数据移动不会发生。所以一个分区的数据不能移动到另一个分区。而reduce只合并最近的分区。

谢谢,

 类似资料:
  • 本文向大家介绍topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?相关面试题,主要包含被问及topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?时的应答技巧和注意事项,需要的朋友参考一下 可以增加 bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-co

  • 本文向大家介绍Kafka 分区数可以增加或减少吗?为什么?相关面试题,主要包含被问及Kafka 分区数可以增加或减少吗?为什么?时的应答技巧和注意事项,需要的朋友参考一下 我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。 Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,

  • 问题内容: 有谁知道是否有任何执行以下操作的闩锁实现: 有一种减少锁存器值的方法,或者如果该值为零则等待 具有等待锁存器值为零的方法 具有将数字添加到锁存器值的方法 问题答案: 除了从AQS开始,您可以使用下面的简单实现。它有些天真(它是同步的,而AQS是无锁算法),但是除非您希望在满足条件的情况下使用它,否则它可能已经足够好了。 注意:我尚未对其进行深入测试,但它的行为似乎与预期的一样: 输出:

  • 我有一个使用Kafka 1.0作为队列的应用程序。Kafka主题有80个分区和80个正在运行的使用者。(Kafkapython消费者)。 通过运行命令: 我看到其中一个分区被卡在一个偏移位置,并且随着新记录的添加,延迟会不断增加。 上面命令的输出如下所示: 这是什么原因?此外,不需要使用重置偏移量命令重置偏移量,因为可能不会定期手动监视此服务器。 客户端作为Linux m/c中的并行进程在后台运行

  • 我不确定在进行聚合操作时应该增加还是减少分区数量。假设我正在使用pyspark数据框架。。 我知道行转换通常需要更多的分区。而将数据保存到磁盘通常需要fewere分区。 但是,对于聚合,我不清楚在中做什么?? 增加分区数的参数:由于我们必须为聚合而洗牌数据,因此您希望洗牌更少的数据,从而增加分区数,以减小分区的大小。 减少分区数量的论点:IT需要大量开销来收集和计算每个分区。因此,太多的分区将导致

  • Spark中的任务数由阶段开始时的RDD分区总数决定。例如,当Spark应用程序从HDFS读取数据时,Hadoop RDD的分区方法继承自MapReduce中的,它受HDFS块的大小、的值和压缩方法等的影响。 截图中的任务花了7,7,4秒,我想让它们平衡。另外,阶段被分成3个任务,有什么方法可以指定Spark的分区/任务数吗?