我试图理解coalesce()
和re分区()
之间的区别。
如果我正确理解了这个答案,coalesce()
只能减少dataframe的分区数量,如果我们尝试增加分区数量,那么分区数量将保持不变。
但当我试图执行下面的代码时,我发现了两件事
这是否意味着使用合并数据帧分区可以增加?
当我执行以下代码时:
val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
println("Original dataframe partitions = " + h1b1Df.rdd.getNumPartitions)
val coalescedDf = h1b1Df.coalesce(2)
println("Coalesced dataframe partitions = " + coalescedDf.rdd.getNumPartitions
val coalescedDf1 = coalescedDf.coalesce(6)
println("Coalesced dataframe with increased partitions = " + coalescedDf1.rdd.getNumPartitions)
我得到以下输出
Original dataframe partitions = 8
Coalesced dataframe partitions = 2
Coalesced dataframe with increased partitions = 6
当我执行以下代码时:
val inpRdd = h1b1Df.rdd
println("Original rdd partitions = " + inpRdd.getNumPartitions)
val coalescedRdd = inpRdd.coalesce(4)
println("Coalesced rdd partitions = " + coalescedRdd.getNumPartitions)
val coalescedRdd1 = coalescedRdd.coalesce(6, false)
println("Coalesced rdd with increased partitions = " + coalescedRdd1.getNumPartitions)
我得到以下输出:
Original rdd partitions = 8
Coalesced rdd partitions = 4
Coalesced rdd with increased partitions = 4
如果您在同一个数据帧上应用了几个合并,而没有在这些合并之间执行任何转换,Spark将通过仅应用最新的合并来优化您的合并,给您的印象是您可以使用合并增加分区数量
如果我们使用以下代码片段:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[8]") // use 8 threads
.appName("test-app")
.getOrCreate()
import spark.implicits._
val input = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).toDF("MyCol")
val simpleCoalesce = input.coalesce(4)
val doubleCoalesce = simpleCoalesce.coalesce(8)
println(doubleCoalesce.rdd.getNumPartitions)
它将打印8
。为了更好地理解我们为什么会得到这个结果,我们在双重合并
上执行解释(true)
:
doubleCoalesce.explain(true)
我们通过以下步骤来构建最终的spark执行计划,以解决doubleCoalesce问题:
== Parsed Logical Plan ==
Repartition 8, false
+- Repartition 4, false
+- Project [value#1 AS MyCol#4]
+- LocalRelation [value#1]
...
== Physical Plan ==
Coalesce 8
+- LocalTableScan [MyCol#4]
我们可以看到,在解析的逻辑计划(对代码片段的简单解析)和物理计划(将应用于生成最终数据帧的计划)之间,只保留了最新的合并,因此实际上从未应用coalesce(4)
转换。因此,只有coalesce(8)
被应用,我们得到一个有8个分区的数据帧。
如果我们想要应用合并(4)
,我们需要在两个合并之间执行转换,例如复杂的选择
:
import org.apache.spark.sql.functions.col
val separatedCoalesce = simpleCoalesce
.select((col("MyCol") + 0).as("MyCol"))
.coalesce(8)
println(separatedCoalesce.rdd.getNumPartitions)
注意:简单选择为。选择(col(“MyCol”)
将不起作用,因为spark将在优化阶段放弃选择
此代码打印4
。如果我们看一下分离合并的物理计划:
== Physical Plan ==
Coalesce 8
+- *(1) Project [(MyCol#4 + 0) AS MyCol#9]
+- Coalesce 4
+- LocalTableScan [MyCol#4]
我们可以看到,这里应用了coalesce(4)
,因此最后我们得到了一个只有4个分区的数据帧,尽管我们应用了coalesce(8)
Spark优化可能很棘手,会让你觉得有些事情没有发生。所以请记住,spark并不是完全执行编写的代码,而是它的优化版本。
Coalesce for dataframe增加的分区数不能超过群集中的核心总数。
val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
h1b1Df.rdd.getNumPartitions // prints 8
val cloasedDf = h1b1Df.coalesce(21)
cloasedDf.rdd.getNumPartitions // prints 8
val cloasedDf1 = cloasedDf.coalesce(2) // prints 2
cloasedDf1.rdd.getNumPartitions
val cloasedDf2 = cloasedDf.coalesce(7) // prints 7
cloasedDf2.rdd.getNumPartitions
Coalesce可以通过设置shuffle=true来增加分区,这等于重新分区。当使用coalesce with shuffle=false来增加时,数据移动不会发生。所以一个分区的数据不能移动到另一个分区。而reduce只合并最近的分区。
谢谢,
本文向大家介绍topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?相关面试题,主要包含被问及topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?时的应答技巧和注意事项,需要的朋友参考一下 可以增加 bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-co
本文向大家介绍Kafka 分区数可以增加或减少吗?为什么?相关面试题,主要包含被问及Kafka 分区数可以增加或减少吗?为什么?时的应答技巧和注意事项,需要的朋友参考一下 我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。 Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,
问题内容: 有谁知道是否有任何执行以下操作的闩锁实现: 有一种减少锁存器值的方法,或者如果该值为零则等待 具有等待锁存器值为零的方法 具有将数字添加到锁存器值的方法 问题答案: 除了从AQS开始,您可以使用下面的简单实现。它有些天真(它是同步的,而AQS是无锁算法),但是除非您希望在满足条件的情况下使用它,否则它可能已经足够好了。 注意:我尚未对其进行深入测试,但它的行为似乎与预期的一样: 输出:
我有一个使用Kafka 1.0作为队列的应用程序。Kafka主题有80个分区和80个正在运行的使用者。(Kafkapython消费者)。 通过运行命令: 我看到其中一个分区被卡在一个偏移位置,并且随着新记录的添加,延迟会不断增加。 上面命令的输出如下所示: 这是什么原因?此外,不需要使用重置偏移量命令重置偏移量,因为可能不会定期手动监视此服务器。 客户端作为Linux m/c中的并行进程在后台运行
我不确定在进行聚合操作时应该增加还是减少分区数量。假设我正在使用pyspark数据框架。。 我知道行转换通常需要更多的分区。而将数据保存到磁盘通常需要fewere分区。 但是,对于聚合,我不清楚在中做什么?? 增加分区数的参数:由于我们必须为聚合而洗牌数据,因此您希望洗牌更少的数据,从而增加分区数,以减小分区的大小。 减少分区数量的论点:IT需要大量开销来收集和计算每个分区。因此,太多的分区将导致
Spark中的任务数由阶段开始时的RDD分区总数决定。例如,当Spark应用程序从HDFS读取数据时,Hadoop RDD的分区方法继承自MapReduce中的,它受HDFS块的大小、的值和压缩方法等的影响。 截图中的任务花了7,7,4秒,我想让它们平衡。另外,阶段被分成3个任务,有什么方法可以指定Spark的分区/任务数吗?