场景:我正在用spark streaming做一些测试。大约有100条记录的文件每25秒就出现一次。
问题:在程序中使用local[*]时,4核pc的处理时间平均为23秒。当我将相同的应用部署到16核服务器时,我期望处理时间有所改善。然而,我发现它在16个内核中也花费了同样的时间(我还检查了ubuntu中的cpu使用率,cpu得到了充分利用)。所有配置默认由spark提供。
问题:处理时间不应该随着可用于流作业的内核数量的增加而减少吗?
法典:
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
.set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(25))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
jsonBuilder.append("[")
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
if (!rdd.partitions.isEmpty) {
val header = rdd.first().split(",")
val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
jsonBuilder.append("{")
val singleRowArray = row.split(",")
(header, singleRowArray).zipped
.foreach { (x, y) =>
jsonBuilder.append(convertToStringBasedOnDataType(x, y))
// GEO Hash logic here
if (x.equals("GPSLat") || x.equals("Lat")) {
lattitude = y.toDouble
}
else if (x.equals("GPSLon") || x.equals("Lon")) {
longitude = y.toDouble
if (x.equals("Lon")) {
// This section is used to convert GPS Look Up to GPS LookUP with Hash
jsonBuilder.append(convertToStringBasedOnDataType("geoCode", GeoHash.encode(lattitude, longitude)))
}
else {
val selectedRow = broadcastTable.value
.filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select("TrackKM", "TrackName").take(1)
if (selectedRow.length != 0) {
jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", selectedRow(0).get(0)))
jsonBuilder.append(convertToStringBasedOnDataType("TrackName", selectedRow(0).get(1)))
}
else {
jsonBuilder.append(convertToStringBasedOnDataType("TrackKm", "NULL"))
jsonBuilder.append(convertToStringBasedOnDataType("TrackName", "NULL"))
}
}
}
}
jsonBuilder.setLength(jsonBuilder.length - 1)
jsonBuilder.append("},")
}
sc.parallelize(Seq(jsonBuilder.toString)).repartition(1).saveAsTextFile("hdfs://localhost:9000/outputDirectory")
听起来您只使用了一个线程,如果是这样的话,应用程序是在具有4个内核还是16个内核的机器上运行都无关紧要。
听起来像是 1 个文件进来,1 个文件是 1 个 RDD 分区,有 100 行。您遍历该RDD中的行并附加jsonBuilder
。最后你调用 repartition(1)
这将使文件的写入成为单线程。
您可以在拾取文件后将数据集修复为12个RDD分区,以确保其他线程在这些行上工作。但除非我错过了什么,否则你很幸运这不会发生。如果两个线程正在调用<code>jsonBuilder,会发生什么。同时追加(“{”)?他们不会创建无效的JSON吗?我可能在这里遗漏了一些东西。
您可以通过添加如下日志来测试我对您的应用程序的单线程性的判断是否正确:
scala> val rdd1 = sc.parallelize(1 to 10).repartition(1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at repartition at <console>:21
scala> rdd1.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-40 => 1
Executor task launch worker-40 => 2
Executor task launch worker-40 => 3
Executor task launch worker-40 => 4
Executor task launch worker-40 => 5
Executor task launch worker-40 => 6
Executor task launch worker-40 => 7
Executor task launch worker-40 => 8
Executor task launch worker-40 => 9
Executor task launch worker-40 => 10
scala> val rdd3 = sc.parallelize(1 to 10).repartition(3)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at repartition at <console>:21
scala> rdd3.foreach{ r => {println(s"${Thread.currentThread.getName()} => $r")} }
Executor task launch worker-109 => 1
Executor task launch worker-108 => 2
Executor task launch worker-95 => 3
Executor task launch worker-95 => 4
Executor task launch worker-109 => 5
Executor task launch worker-108 => 6
Executor task launch worker-108 => 7
Executor task launch worker-95 => 8
Executor task launch worker-109 => 9
Executor task launch worker-108 => 10
我有一个用例,我必须以FIFO方式处理事件。这些是从机器生成的事件。每台机器每30秒生成一个事件。对于特定的机器,我们需要根据FIFO FASION对事件进行处理。 我们每天需要处理大约2.4亿个事件。对于如此大的规模,我们需要使用Kafka+火花流 从Kafka文档中,我了解到我们可以使用消息的关键字段将消息路由到特定的主题分区。这确保我可以使用机器id作为密钥,并确保来自特定机器的所有消息都进
当前设置:Spark流作业处理timeseries数据的Kafka主题。大约每秒就有不同传感器的新数据进来。另外,批处理间隔为1秒。通过,有状态数据被计算为一个新流。一旦这个有状态的数据穿过一个treshold,就会生成一个关于Kafka主题的事件。当该值后来降至treshhold以下时,再次触发该主题的事件。 问题:我该如何避免这种情况?最好不要切换框架。在我看来,我正在寻找一个真正的流式(一个
问题内容: 这是一个要阐述的问题:为什么说内核在进程地址空间中? 这可能是一个愚蠢的问题,但在我脑海中浮现出来。有关进程地址空间和虚拟内存布局的所有文字都提到进程地址空间具有为内核保留的空间。例如,在32位系统上,进程地址空间为4GB,其中1 GB为Linux中的内核保留(其他OS上可能有所不同)。 我只是想知道为什么当进程无法直接寻址内核时,为什么说内核位于进程地址空间中。为什么我们不说内核具有
问题内容: 我一直在阅读有关Python的多处理模块的信息。我仍然认为我对它可以做什么没有很好的了解。 假设我有一个四核处理器,并且我有一个包含1000000个整数的列表,我想要所有整数的总和。我可以简单地做: 但这仅将其发送到一个内核。 是否有可能使用多处理模块将数组划分为多个,并让每个核获得其部分的总和并返回值,以便可以计算总和? 就像是: 任何帮助,将不胜感激。 问题答案: 是的,可以对多个
我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成: 数据帧()具有以下架构: 例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查、或是否。 第一种方法: 如果<code>struct是否为空,因为在scala中<code>浮点不能为空。 第二种方法: 这种方法,我可以在我的udf中检查是否为空,但我可以检查
需要进行一些运行时澄清。 在我读到的其他地方的一个线程中,有人说Spark Executor应该只分配一个核心。然而,我想知道这是否真的永远是真的。阅读各种so问题和诸如此类的问题,以及Karau、Wendell等人的著作,可以清楚地看到,有相同或相反的专家指出,在某些情况下,每个执行者应该指定更多的内核,但讨论往往更多的是技术性的,而不是功能性的。也就是说,缺少功能性的例子。 > 我的理解是RD