当前位置: 首页 > 知识库问答 >
问题:

Apache spark如何计算分区以及如何在执行器中处理分区

阚原
2023-03-14

我需要一些帮助来了解spark如何决定分区的数量,以及它们在executors中是如何处理的,我很抱歉这个问题,因为我知道这是一个重复的问题,但即使在阅读了许多文章后,我仍然不能理解我正在放上一个我目前正在工作的真实生活用例,以及我的Spark提交配置和集群配置。

我的硬件配置:

< code>3节点计算机,总Vcores=30,总内存=320 GB。

spark-submit config:

spark-submit \
--verbose \
--master yarn \
--deploy-mode cluster \
--num-executors 1  \
--executor-memory 3g \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.driver.memory=1000m \
--conf spark.speculation=true \

我正在使用spark dataframe Jdbc api从MySql数据库读取:

val jdbcTable= sqlContext.read.format("jdbc").options(
            Map(
              "url" -> jdcbUrl,
              "driver" -> "net.sourceforge.jtds.jdbc.Driver",
              "dbtable" ->
                s"(SELECT * FROM SOMETHING WHERE COLUMN > ${lastExtractUnixTime}) as t"))
            .load

jdbcTable DATAFRAME 创建的分区总数为 200

问题:

> < Li > < p > spark如何实现< code>200分区,这是默认设置吗?

因为我只有1个执行器,所以200分区是在单个执行器中并行处理还是一次处理一个分区?

执行器核心是否用于处理具有配置并发性(即 2(在我的情况下)的每个分区中的任务?

共有1个答案

隗高旻
2023-03-14

>

  • 正如现在编写的那样,Spark将只使用1个分区
  • 如果您看到200个分区,这意味着:

    • 代码中没有显示后续的洗牌(交换)。
    • 您使用spark.sql.shuffle.partitions的默认值。

    并行性将取决于执行计划和分配的资源。它不会高于min(分区数、火花核数)。如果有一个执行器,那么集群管理器将分配给该执行器的线程数。

  •  类似资料:
    • spark如何给一个执行器分配一个分区? 当我使用 1 个驱动程序和 5 个执行器在火花外壳中运行以下行时: 重新分区后,10个分区仍然位于原来的两个节点上(总共5个)。这似乎非常低效,因为5个任务在包含分区的每个节点上重复运行,而不是平均分布在节点上。在同一个rdds上重复多次的迭代任务中,效率低下最为明显。 所以我的问题是,Spark如何决定哪个节点具有哪个分区,有没有办法强制将数据移动到其他

    • 我们使用的是partitioner,它用@Scope(value=“step”)注释,还有setter方法,用@beforstep注释,但framewowrk仍然没有注入step执行对象? 我们做错了什么

    • 我想了解火花流中的一个基本的东西。我有50个Kafka主题分区和5个执行者的数字,我正在使用DirectAPI所以没有。的RDD分区将为50个。这个分区将如何在5个执行器上处理?将在每个执行器上一次触发进程1个分区,或者如果执行器有足够的内存和内核,它将在每个执行器上并行处理超过1个分区。

    • 问题内容: 我有一个典型的客户/订单中设置的表,我想显示总 百分比 销售的特定客户负责。我可以像这样获得系统中的订单总数: 这样我就可以得到客户发出的订单总数: 如何将它们组合成一个查询,以返回特定客户的销售百分比?谢谢! 问题答案: MySQL: 编辑 我想如果我注意到 postgres 标签会有所帮助。我认为这是一个MySQL问题。 PostgreSQL: PS我的PostgreSQL生锈了,

    • 我使用Spark 2.1.1。 我使用结构化流从2个Kafka分区读取消息。我正在向Spark Standalone集群提交我的应用程序,其中有一个工人和两个执行者(每个2个核心)。 我想要这样的功能,来自每个Kafka分区的消息应该由每个单独的执行器独立处理。但现在正在发生的是,执行器分别读取和映射分区数据,但在映射之后,形成的无边界表被普遍使用,并且具有来自两个分区的数据。 当我对表运行结构化

    • 当生成消息到Kafka时,您可以得到两种错误:可检索和不可检索。在处理它们时,你应该如何区分它们? 我希望异步生成记录,将接收到不可重试异常的记录保存在另一个主题(或HBase)中,并让生产者为我处理所有接收到可重试异常的记录(最多尝试次数,当它最终到达时,会成为第一批异常之一)。 我的问题是:尽管有,但生产者是否仍会自行处理可检索的异常?因为在接口回调中说: 可重试异常(暂时的,可通过增加#.重