当前位置: 首页 > 知识库问答 >
问题:

Spark内部工作原理

梁丘逸仙
2023-03-14

我知道Spark可以使用Scala、Python和Java来操作。另外,RDDs用于存储数据

但是请解释一下,Spark的架构是什么,内部是如何工作的。

共有1个答案

汝志
2023-03-14

Spark围绕弹性分布式数据集(RDD)的概念展开,它是可以并行操作的元素的容错集合。RDDs支持两种类型的操作:转换,从现有的数据集创建新的数据集;操作,在对数据集运行计算后返回一个值给驱动程序。

Spark将RDD转换转换为DAG(有向无环图)并开始执行,

在高层,当在RDD上调用任何操作时,Spark创建DAG并提交给DAG调度器。

让我们来看看Spark如何构建DAG。

在高层,有两种变换可以应用到RDDs上,即窄变换和宽变换。宽的转换基本上会导致阶段边界。

狭义转换--不需要在分区之间洗牌数据。例如,映射筛选等。

INFO I'm Info message
WARN I'm a Warn message
INFO I'm another Info message
val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
                        .map(words => (words(0), 1))
                        .reduceByKey{(a,b) => a + b}

这个命令序列隐式地定义了一个RDD对象的DAG(RDD沿袭),以后调用操作时将使用该对象。每个RDD维护一个指向一个或多个父级的指针,以及关于它与父级的关系类型的元数据。例如,当我们在RDD上调用val b=a.map()时,RDDb保留对其父a的引用,这是一种沿袭。

为了显示RDD的沿袭,Spark提供了一个调试方法todebugstring()方法。例如,在SplitedLinesRDD上执行TodebugString()将输出以下内容:

(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[5] at map at <console>:24 []
    |  MapPartitionsRDD[4] at map at <console>:23 []
    |  log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
    |  log.txt HadoopRDD[0] at textFile at <console>:21 []

第一行(从下)显示输入RDD。我们通过调用sc.textfile()创建了这个RDD。请参见下面从给定的RDD创建的DAG图的更多图解视图。

构建DAG后,Spark scheduler将创建一个物理执行计划。如上所述,DAG调度器将图分成多个阶段,阶段是基于转换创建的。狭窄的转换将被分组(管道内衬)到一个单一的阶段。因此,对于我们的示例,Spark将创建一个两阶段执行,如下所示:

然后,DAG计划程序将阶段提交到任务计划程序中。提交的任务数取决于textfile中存在的分区数。Fox示例在这个示例中考虑我们有4个分区,那么如果有足够的从/核,将有4组任务并行创建和提交。下图更详细地说明了这一点:

要获得更详细的信息,我建议您浏览以下YouTube视频,在这些视频中,Spark的创建者提供了关于DAG和执行计划以及生命周期的深入细节。

    null
 类似资料:
  • 我有一个xml布局,它有以下视图:滚动视图->Relationvelayout->Some views+Tablayout+ViewPager->RecylerView(在ViewPager的片段中)。ViewPager有一些固定的高度(保持它“wrap_content”根本不会显示它)。现在的问题是Recylerview永远不会滚动。我已经尝试了几个已经发布的解决方案,比如在“嵌套滚动视图”中包

  • 我未能弄清楚Spark SQL join操作实际上是如何工作的。我读过相当多的解释,但它没有给一些问题带来光明。 例如,您有两个以Spark(parquet或任何其他格式)保存的数据库表。您必须根据一些列加入它们: 我将以的形式启动该查询 null 现在我必须连接大约50对巨大的表,它们中的每一个都应该被分成多个块(子集),比如说5个块。因此,我将获得块之间的筛选和联接操作,而不是

  • 我有一个关于这个连接器的问题。如果我的Spark集群和Cassandra集群不在同一个集群上,读取如何工作?Spark是否将整个Cassandra表带入自己的集群并将其重新排列到Spark分区中?

  • 问题内容: 请帮助我了解newFixedThreadPool(或Cached)的内部流程 当我们编写以下语句时,ExecutorService e = Executors.newFixedThreadPool(3); e.execute(runaable1); e.execute(runaable2); e.execute(runaable3); e.execute(runaable4); e.e

  • 请帮助我理解newFixedThreadPool(或Cached)的内部流 当我们编写下面的语句时,ExecutorService e=executors.NewFixedThreadPool(3); e.execute(runaable1); e.execute(runaable2); e.execute(runaable3); e.execute(runaable4); e.execute(r

  • 5个节点各有4个内核和32GB内存,其中一个节点(节点4)有8个内核和32GB内存。 所以我总共有6个节点-28个核,192GB RAM。(我想使用一半的内存,但要使用所有的内核) 计划在集群上运行5个spark应用程序。 我的spark\u默认值。配置如下: 我想在每个节点上使用16GB max,并通过设置以下配置在每台机器上运行4个工作实例。所以,我希望(4个实例*6个节点=24个)集群上的工