假设我每个执行器有36个核心,每个节点有一个执行器,以及3个节点,每个节点有48个可用核心。我注意到的基本要点是,当我将每个任务设置为使用1个内核(默认值)时,我对workers的CPU利用率约为70%,每个执行器将同时执行36个任务(正如我所预期的那样)。然而,当我将配置更改为每个任务有6个内核时(conf spark.task.cpus=6),每个执行器一次会减少到6个任务(如预期的那样),但我的CPU利用率也会下降到10%以下(意外的)。我假设Spark知道如何在6核上并行化工作负载。
重要的实现细节是,我正在数据帧的一列上运行一个UDF函数,并将结果作为新列附加到该数据帧上。此UDF函数使用一个瞬态对象,该对象提供我正在使用的机器学习算法。此UDF函数不是聚合或合并操作的一部分,它只是对列执行的映射操作,如下所示:
def myUdf = udf { ... }
val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)
我原以为Spark会执行6个myUdf,一次处理6条记录,每个核心一条,但事实似乎并非如此。有没有办法解决这个问题(无需向Spark项目提交PR),或者至少有人能解释为什么会发生这种情况?
预料到这个问题,我正在尝试增加每个任务的内核数量,以减少每个执行程序所需的RAM量。在这种情况下,一次执行太多任务会成倍增加RAM的使用量。
<代码>火花。任务CPU是为每个任务分配的多个核心。当用户代码为多线程时,它用于将多个内核分配给单个任务。如果您的udf不使用多个线程(不会在一个函数调用中产生多个线程),那么内核就是浪费了。
一次处理6条记录
使用spark分配6个核心。任务CPU设置为1。如果要限制节点上的任务数,请减少每个节点提供的核心数。
实际上,Spark可以通过在每个任务之间拆分记录(根据分区)并确定每个执行者可以同时处理多少个任务,自行决定如何在多个记录上同时拆分映射UDF。但是,Spark不能自动将每个核心的工作分配给每个任务。要在每个任务中使用多个核心,需要编写UDF中的代码,以便在单个记录上并行该UDF中的计算,UDF中的代码将在每个任务的一次(顺序)在一条记录上执行。
我有一个类,它基本上会做两次相同的步骤。听起来像是一个在哪里多线程处理程序的完美例子。我的问题是,如果我只需要两个线程就可以做到这一点。以下是一般情况 我已经完成了第一部分——构建对象——的工作。我现在的问题是- 如何让主线程等待两个线程完成其第一部分?也许main会在两个对象上执行等待,然后在线程notifyAll之后,它们会在主线程上执行等待?但是线程如何抓住主线程呢?也许用这个 我怎样才能在
多台机器生成事件。这些事件被发送到我们的Kafka集群,其中每台机器都有自己的主题(app.machine-events.machine-name)。因为顺序在每台机器的基础上很重要,而分区大小现在不是问题,所以所有主题都由一个分区组成。因此,目前,N个主题也意味着N个分区。 消费/处理应用程序使用了kafka-streams,我们给出了/“machine-event-processor”,它对每
问题内容: 我有一个可加载的内核模块,其初始化如下 我还启用了正在使用的内核版本上启用的动态调试-ie 。 在模块的Makefile中,我在其中添加了一行,即文件名。 现在,我在执行此模块的insmod后检查了一下,在其中发现了以下几行 即使做了所有这些,令我失望的是,在dmesg的输出中找不到上述两个pr_debug语句。那我想念什么或做错什么呢? 问题答案: 假设是模块源文件,请将以下内容添加
问题内容: 使用ant,maven和buildr有什么意义?在eclipse或netbeans中使用using构建会无法正常工作吗?我很好奇扩展构建工具的目的和好处是什么。 问题答案: 依赖管理 :构建工具遵循组件模型,该组件模型提供有关在哪里寻找依赖的提示。在Eclipse / Netbeans中,您必须依赖JAR,并且您实际上并不知道此JAR是否已更新。使用这些构建工具,它们“知道”依赖项中的
问题内容: 似乎应该已经问过这个问题,但是搜索没有发现任何问题。 我一直想知道让我们将所有代码放入类或接口的意义何在。我似乎记得,要求像C这样的函数有一些优点,但对于类却没有。像Python这样的语言在某种程度上比Java更面向对象,因为它们没有基元,但是您可以将代码放在任何需要的地方。 这是对OOP的某种“误解”吗?毕竟,您可以像在C语言中一样编写过程代码,并将其放在类中,但是不会面向对象。 问
我不太理解参数。在我看来,“任务”对应于执行器中的“线程”或“进程”。假设我将“spark.task.cpus”设置为2。 > 线程如何同时使用两个CPU?它不会需要锁并导致同步问题吗? 我正在查看部署/执行器/Executor.scala中的函数,在这里我看不到任何“每个任务的cpu数量”的概念。那么Spark最终在哪里/如何在独立模式下为任务分配多个cpu呢?