我想了解火花流中的一个基本的东西。我有50个Kafka主题分区和5个执行者的数字,我正在使用DirectAPI所以没有。的RDD分区将为50个。这个分区将如何在5个执行器上处理?将在每个执行器上一次触发进程1个分区,或者如果执行器有足够的内存和内核,它将在每个执行器上并行处理超过1个分区。
将在每个执行器上一次触发进程1个分区,或者如果执行器有足够的内存和内核,它将在每个执行器上并行处理超过1个分区。
Spark将根据正在运行的作业可用内核的总量来处理每个分区。
假设您的流式作业有10个执行器,每个执行器有2个核。这意味着,假设spark.task.cpus
设置为1,您将能够并发处理10 x 2=20个分区。
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
这里的关键是executorDataMap
,它保存了从执行器id到executorData
的映射,该映射告诉系统中每个这样的执行器使用了多少核,并根据该核和分区的首选位置,对该任务应该运行哪个执行器进行有根据的猜测。
下面是一个使用Kafka的Spark流媒体应用的例子:
我们有5个分区,运行着3个执行器,其中每个执行器有2个以上的内核,这使得流可以并发地处理每个分区。
我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的
我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不
我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?
我对spark有疑问:HDFS块vs集群核心vs rdd分区。 假设我正在尝试在HDFS中处理一个文件(例如块大小为64MB,文件为6400MB)。所以理想情况下它确实有100个分裂。 我的集群总共有 200 个核心,我提交了包含 25 个执行程序的作业,每个执行程序有 4 个核心(意味着可以运行 100 个并行任务)。 简而言之,我在rdd中默认有100个分区,100个内核将运行。 这是一个好方
我需要一些帮助来了解spark如何决定分区的数量,以及它们在executors中是如何处理的,我很抱歉这个问题,因为我知道这是一个重复的问题,但即使在阅读了许多文章后,我仍然不能理解我正在放上一个我目前正在工作的真实生活用例,以及我的Spark提交配置和集群配置。 我的硬件配置: < code>3节点计算机,总Vcores=30,总内存=320 GB。 我正在使用spark dataframe J
我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成: 数据帧()具有以下架构: 例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查、或是否。 第一种方法: 如果<code>struct是否为空,因为在scala中<code>浮点不能为空。 第二种方法: 这种方法,我可以在我的udf中检查是否为空,但我可以检查