我在纱线簇中运行我的spark应用程序。在我的代码中,我使用队列的可用数量核心在数据集中创建分区:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题:如何通过编程方式而不是通过配置获取可用的队列核心数?
有多种方法可以从Spark获取执行程序的数量和集群中的核心数量。这是我过去使用的一些Scala实用程序代码。您应该可以轻松地使其适应Java。有两个关键思想:
工人人数是执行者人数减去一或sc.getExecutorStorageStatus.length - 1
。
每个工人的核心数可以通过java.lang.Runtime.getRuntime.availableProcessors
在一个工人上执行来获得。
其余代码是为SparkContext
使用Scala隐式添加便利方法的样板。我在1.x年前编写了代码,这就是为什么不使用的原因SparkSession
。
最后一点:合并多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。在实践中,我使用1.5倍至4倍之间的任何值,具体取决于数据大小以及作业是否在共享群集上运行。
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
更新资料
getExecutorStorageStatus
已被删除。我们已切换为使用SparkEnv
的`blockManager.master.getStorageStatus.length1(减号再次用于驱动程序)。正常的方式来得到它,通过
env的
SparkContext是没有的外部访问
org.apache.spark`包。因此,我们使用封装违规模式:
package org.apache.spark
object EncapsulationViolator {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}
问题内容: 如何从包含证书和私钥的PEM文件中以编程方式获取KeyStore?我试图在HTTPS连接中向服务器提供客户端证书。我已经确认,如果我使用openssl和keytool来获取jks文件(该文件是动态加载的),则客户端证书可以使用。我什至可以通过动态读取p12(PKCS12)文件来使其工作。 我正在考虑使用BouncyCastle的PEMReader类,但无法克服一些错误。我正在使用-Dj
我需要实现一个函数,该函数获取一个由n个元素和一个数字k组成的数组A作为输入,并返回一个包含大小为k的所有子集的数组(每个子集本身就是一个数组)。 定义函数的类型,并为函数实现至少3个测试(使用assert)。它应该是Javascript/Typescript和functional 例如:FunSubset([1,2,3],2)= 有什么想法吗?
问题内容: 我正在编写一个报告本地计算机上网络设备属性的应用程序。我需要mac地址,mtu,链接速度和其他一些信息。我为此使用udev。我已经弄清楚了如何获取mac地址和mtu,但还没有弄清楚链接速度。我可以从终端使用ethtool来获取它,但是我需要一种以编程方式获取它的方法。 有谁知道我如何获得udev或其他库的链接速度属性? 问题答案: 您需要使用ioctl()调用。在LinuxJourna
我有一个基于linux操作系统的自定义嵌入式系统。有摄像头连接到我的系统,我想使用谷歌驱动器作为云存储记录从我的摄像头。 3-然后我使用浏览器访问url,键入授权代码并允许手动身份验证。然后通过另一个HTTP请求获得access_token和refresh_token。 4-之后,我可以成功地使用给定access_token的任何api函数。(如果它过期,我会使用refresh_token刷新它)
问题内容: 我想在我的C ++应用程序中的特定位置强制进行核心转储。 我知道我可以通过执行以下操作来做到这一点: 但是我想知道是否有更清洁的方法? 我正在使用Linux。 问题答案: 提高信号号6(在Linux中)是一种方法(尽管请记住,并非所有POSIX实现中SIGABRT都 必须 为6,因此,如果不是quick’n,则可能需要使用值本身’脏调试代码)。 调用也会导致核心转储,你甚至可以做到这一
问题内容: 我想从受监视的应用程序内部以编程方式获得等效的输出。我看到可以通过HotSpot诊断Bean触发二进制堆转储,但是我看不到如何获取直方图数据。可能吗 ? 问题答案: 这可能是不是最好的例子/代码,但看看这个 (我认为这仅适用于Hotspot JVM)