在减少分区数量时,可以使用coalesce,这很好,因为它不会引起混乱,而且似乎可以立即工作(不需要额外的作业阶段)。
有时我想做相反的事情,但重新分区会导致混乱。我想几个月前,我实际上通过使用balanceSlack=1.0的CoalescedRDD实现了这一点-所以会发生的是,它会分割一个分区,从而得到所有分区都位于同一节点上的分区位置(如此小的网络IO)。
这种功能在Hadoop中是自动的,只需调整拆分大小。除非减少分区数量,否则它在Spark中似乎无法以这种方式工作。我认为解决方案可能是编写一个自定义分区程序以及一个自定义RDD,我们在其中定义了getPreferredLocations
...但我认为这是一件如此简单和常见的事情,肯定有一种直接的方法可以做到吗?
尝试过的事情:
<代码>。在我的SparkConf上设置(“spark.default.parallelism”,partitions),并且在阅读拼花的上下文中,我尝试了sqlContext。sql(“set spark.sql.shuffle.partitions=…”,它在1.0.0上会导致一个错误,并不是我真正想要的,我希望分区号在所有类型的作业中都能更改,而不仅仅是洗牌。
正如你所知,pyspark使用某种“懒惰”的跑步方式。它只会在有一些操作要执行时进行计算(例如“df.count()”或“df.show()”。因此,您可以在这些操作之间定义一个无序分区。
你可以写:
sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=100")
# you spark code here with some transformation and at least one action
df = df.withColumn("sum", sum(df.A).over(your_window_function))
df.count() # your action
df = df.filter(df.B <10)
df = df.count()
sparkSession.sqlContext().sql("set spark.sql.shuffle.partitions=10")
# you reduce the number of partition because you know you will have a lot
# less data
df = df.withColumn("max", max(df.A).over(your_other_window_function))
df.count() # your action
我不太明白你的意思。您的意思是现在有5个分区,但在下一次操作之后,您希望将数据分发到10个分区?因为有10个,但仍然使用5没有多大意义……向新分区发送数据的过程必须在某个时候发生。
在执行合并时,可以去掉未指定的分区,例如:如果最初有100个分区,但在reduceByKey之后有10个分区(因为只有10个键),可以设置合并。
如果您想让流程走另一条路,可以强制进行某种分区:
[RDD].partitionBy(new HashPartitioner(100))
我不确定你在找什么,但希望如此。
注意这个空间
https://issues.apache.org/jira/browse/SPARK-5997
这种非常简单的显而易见的特性最终会被实现——我想就在他们完成了数据集中所有不必要的特性之后。
Spark中的任务数由阶段开始时的RDD分区总数决定。例如,当Spark应用程序从HDFS读取数据时,Hadoop RDD的分区方法继承自MapReduce中的,它受HDFS块的大小、的值和压缩方法等的影响。 截图中的任务花了7,7,4秒,我想让它们平衡。另外,阶段被分成3个任务,有什么方法可以指定Spark的分区/任务数吗?
我试图理解和之间的区别。 如果我正确理解了这个答案,只能减少dataframe的分区数量,如果我们尝试增加分区数量,那么分区数量将保持不变。 但当我试图执行下面的代码时,我发现了两件事 对于合并的Dataframe,可以增加分区数 对于Rdd,如果Shuffle=false,那么分区的数量不能随着合并而增加。 这是否意味着使用合并数据帧分区可以增加? 当我执行以下代码时: 我得到以下输出 当我执行
我被一些行为弄糊涂了,不知道是否有人能帮忙。我有一个React组件,它根据通过道具传入的过滤器获取电影数据。使用控制台。日志我可以看到我的componentDidMount()只被调用一次,但是每次由于接收到不同的道具而重新呈现组件时,仅在componentDidMount()中设置的状态变量都会更改。我的代码很长,所以我不想全部发布,但如果需要,我可以发布。下面是引起我困惑的片段: 尽管如此。过
我正在使用名称:kafka2.12版本:2.3.0。根据流量/负载,我想更改主题的最大分区数。Kafka一上来,有没有可能做这种改变,用代码能做到吗?
问题内容: 我在Google和此站点中搜索了我的问题,但我仍然不明白该解决方案。 我有一段程序里面有一些数据。程序在大型阵列上崩溃,并显示虚拟内存不足的错误,因此我开始考虑文件。 在此之前: 之后: 我在Ubuntu上进行了测试,并通过系统监视器看到了这种内存增加。但是我很困惑,(和)参数没有变化。 问题是-实际内存使用量的指标是什么? 这是否表示真实指标是?(并且仅分配但仍未使用的内存) 问题答
我有一个使用Kafka 1.0作为队列的应用程序。Kafka主题有80个分区和80个正在运行的使用者。(Kafkapython消费者)。 通过运行命令: 我看到其中一个分区被卡在一个偏移位置,并且随着新记录的添加,延迟会不断增加。 上面命令的输出如下所示: 这是什么原因?此外,不需要使用重置偏移量命令重置偏移量,因为可能不会定期手动监视此服务器。 客户端作为Linux m/c中的并行进程在后台运行