当前位置: 首页 > 知识库问答 >
问题:

如何通过spark中的Futures保证集群资源的有效利用

公孙向荣
2023-03-14

我想在一个火花集群中并行运行多个火花SQL,这样我就可以广泛利用整个资源集群。我正在使用sqlContext.sql(查询)。

我在这里看到了一些示例代码,如下所示,

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

据我所知,ExecutionContext计算机器中的可用内核(使用ForkJoinPool)并相应地进行并行处理。但是,如果我们考虑的是spark集群而不是单机,那么会发生什么呢?它如何保证完全的集群资源利用率呢。?

例如:如果我有一个10节点的集群,每个集群有4个核心,那么上面的代码如何保证使用40个核心。

编辑:-

假设有2个sql要执行,我们有两种方法可以做到这一点,

>

  • 按顺序提交查询,以便仅在执行第一个查询后才能完成第二个查询。(因为sqlContext.sql(查询)是一个同步调用)

    使用Futures并行提交这两个查询,以便在集群中独立并行执行这两个查询,前提是有足够的资源(在这两种情况下)。

    我认为第二个更好,因为它使用了集群中可用的最大资源,如果第一个查询充分利用了资源,调度程序将等待作业完成(取决于策略),这在这种情况下是公平的。

    但是正如user9613318提到的“增加池大小会使驱动程序饱和”那么我如何有效地控制线程以更好地利用资源。

  • 共有1个答案

    郎宣
    2023-03-14

    并行性在这里的影响最小,额外的集群资源不会真正影响该方法<代码>未来(或线程)不是用来并行执行,而是用来避免阻塞执行。增加池大小只会使驱动程序饱和。

    您真正应该关注的是应用程序调度池中的Spark以及窄分区数量的调整(如何在Spark-SQL中更改分区大小,partitionColumn、lowerBound、upperBound、numPartitions参数的含义是什么?)和广度(spark.sql.shuffle.partitions的最佳值是什么,或者在使用spark-sql时如何增加分区?)转换。

    如果作业是完全独立的(代码结构表明),则最好使用自己的一组分配资源单独提交每个作业,并相应地配置集群调度池。

     类似资料:
    • 多集群资源即统一管理集群的命名空间、角色、集群角色等资源并将其映射到多个集群中。 命名空间 命名空间用于逻辑上隔离Kubernetes集群中的资源。 角色 角色定义了对集群的指定命名空间下资源的权限。 集群角色 集群角色定义了对集群下资源的权限。 角色绑定 角色绑定定义了角色绑定和服务账户的绑定关系。 集群角色绑定 集群角色绑定定义了集群角色和服务账户的绑定关系。

    • 为了管理异构和不同配置的主机,为了便于Pod的运维管理,Kubernetes中提供了很多集群管理的配置和管理功能,通过namespace划分的空间,通过为node节点创建label和taint用于pod的调度等。

    • 我可以确认使用spark shell连接到仪表盘,例如。 作品 但是 没有并给出错误

    • 我已经在Kubernetes上建立了Spark独立集群,并试图连接到Kubernetes上没有的Kerberized Hadoop集群。我已经将core-site.xml和hdfs-site.xml放在Spark集群的容器中,并相应地设置了HADOOP_CONF_DIR。我能够成功地在Spark容器中为访问Hadoop集群的principal生成kerberos凭据缓存。但是当我运行spark-s

    • 我想知道我正在使用的整个K8s集群中有哪些可用资源。 明确地说,我不是在谈论资源配额,因为它们只定义每个名称空间的资源。我想知道整个集群的功能是什么(内存、cpu等等)。请注意,所有资源配额的总和并不等于集群的能力。与集群的资源相比,总和可以更大(为名称空间之间的资源创建竞争条件)或更小(集群未充分利用其潜力)。 我能用kubectl回答这个问题吗?

    • 我正在开发一个服务,我想用它来监控标签和实施标签策略。 一个计划中的功能是检测带有相应键不允许的值的资源。 我已经可以列出具有特定标记键的资源的ARN,现在我希望根据无效值筛选此资源列表。要做到这一点,我想使用每个资源标签的ARN查询一个列表,然后根据标签中有无效值的资源进行过滤。 我有我想做一些类似来获取指定资源的标记。 我正在使用nodejs,但我很乐意使用基于AWS cli或任何其他可以在脚