当前位置: 首页 > 知识库问答 >
问题:

大查询时间火花卡桑德拉

籍英叡
2023-03-14

全能的开发者们。我在Spark中运行一些基本的分析,在这里我查询多节点Cassandra。我正在运行的代码以及我正在处理的一些非链接代码是:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import com.datastax.spark.connector._


object cassSpark {
  def main(args: Array[String]): Unit = {

val conf = new SparkConf()
  .set("spark.cassandra.connection.host","192.168.56.101")
  .set("spark.cassandra.connection.host","192.168.56.102")
  .set("spark.cassandra.connection.local_dc", "datacenter1")
  .setMaster("local[*]")
  .setAppName("cassSpark")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)



time{
val df = sqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "nth1", "keyspace"->"nth", "cluster" -> "Test Cluster"))
 .load().cache()

df.count()


}


def time[A](f: => A) = {
  val s = System.nanoTime
  val ret = f
  println("time: " + (System.nanoTime - s) / 1e6 + "ms")
  ret
}
 }
}

Spark的版本是1.6.0,Cassandra v3。0.10,连接器也是1.6.0。键空间有复制系数:1,表有5列,实际上只有一行。如您所见,有两个节点(OracleVM中制作的虚拟Macine)。

我的问题是,当我测量从spark到cassandra的查询时间时,我得到了大约20秒的结果,这对我来说是不正常的,因为表中只有一行。我遗漏了什么,或者我测量出了什么错误,或者可能是我的代码中有什么内容。有人能帮我吗,或者告诉我如何高效地完成或处理它。

[编辑]

正如@Artem Aliev所希望的,这是完整的信息日志:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/12/06 12:31:04 INFO SparkContext: Running Spark version 1.6.0
16/12/06 12:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/06 12:31:07 INFO SecurityManager: Changing view acls to: superbrainbug
16/12/06 12:31:07 INFO SecurityManager: Changing modify acls to: Ivan Majnaric
16/12/06 12:31:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(superbrainbug); users with modify permissions: Set(superbrainbug)
16/12/06 12:31:12 INFO Utils: Successfully started service 'sparkDriver' on port 62101.
16/12/06 12:31:14 INFO Slf4jLogger: Slf4jLogger started
16/12/06 12:31:14 INFO Remoting: Starting remoting
16/12/06 12:31:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:62114]
16/12/06 12:31:15 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 62114.
16/12/06 12:31:15 INFO SparkEnv: Registering MapOutputTracker
16/12/06 12:31:15 INFO SparkEnv: Registering BlockManagerMaster
16/12/06 12:31:15 INFO DiskBlockManager: Created local directory at C:\Users\superbrainbug\AppData\Local\Temp\blockmgr-8b664e71-ead1-4462-b171-bf542a5eb444
16/12/06 12:31:15 INFO MemoryStore: MemoryStore started with capacity 1124.6 MB
16/12/06 12:31:18 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/06 12:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/06 12:31:20 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/12/06 12:31:20 INFO Executor: Starting executor ID driver on host localhost
16/12/06 12:31:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62136.
16/12/06 12:31:20 INFO NettyBlockTransferService: Server created on 62136
16/12/06 12:31:20 INFO BlockManagerMaster: Trying to register BlockManager
16/12/06 12:31:21 INFO BlockManagerMasterEndpoint: Registering block manager localhost:62136 with 1124.6 MB RAM, BlockManagerId(driver, localhost, 62136)
16/12/06 12:31:21 INFO BlockManagerMaster: Registered BlockManager
16/12/06 12:31:23 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
16/12/06 12:31:24 INFO Cluster: New Cassandra host /192.168.56.101:9042 added
16/12/06 12:31:24 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:31 INFO SparkContext: Starting job: count at cassSpark.scala:69
16/12/06 12:31:31 INFO DAGScheduler: Registering RDD 7 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Got job 0 (count at cassSpark.scala:69) with 1 output partitions
16/12/06 12:31:31 INFO DAGScheduler: Final stage: ResultStage 1 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 20.0 KB, free 20.0 KB)
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.2 KB, free 29.2 KB)
16/12/06 12:31:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:62136 (size: 9.2 KB, free: 1124.6 MB)
16/12/06 12:31:35 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69)
16/12/06 12:31:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/12/06 12:31:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 20008 bytes)
16/12/06 12:31:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/12/06 12:31:35 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/12/06 12:31:36 INFO GenerateUnsafeProjection: Code generated in 395.192925 ms
16/12/06 12:31:39 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 1168.0 B, free 30.3 KB)
16/12/06 12:31:39 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:62136 (size: 1168.0 B, free: 1124.6 MB)
16/12/06 12:31:39 INFO GeneratePredicate: Code generated in 35.193967 ms
16/12/06 12:31:39 INFO GenerateColumnAccessor: Code generated in 52.555004 ms
16/12/06 12:31:39 INFO GenerateMutableProjection: Code generated in 14.129896 ms
16/12/06 12:31:39 INFO GenerateUnsafeProjection: Code generated in 14.130749 ms
16/12/06 12:31:40 INFO GenerateMutableProjection: Code generated in 19.217034 ms
16/12/06 12:31:40 INFO GenerateUnsafeRowJoiner: Code generated in 72.736302 ms
16/12/06 12:31:40 INFO GenerateUnsafeProjection: Code generated in 133.407346 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3895 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5452 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/06 12:31:41 INFO DAGScheduler: ShuffleMapStage 0 (count at cassSpark.scala:69) finished in 5,542 s
16/12/06 12:31:41 INFO DAGScheduler: looking for newly runnable stages
16/12/06 12:31:41 INFO DAGScheduler: running: Set()
16/12/06 12:31:41 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/12/06 12:31:41 INFO DAGScheduler: failed: Set()
16/12/06 12:31:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.3 KB, free 39.6 KB)
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 44.1 KB)
16/12/06 12:31:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:62136 (size: 4.6 KB, free: 1124.6 MB)
16/12/06 12:31:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/12/06 12:31:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1999 bytes)
16/12/06 12:31:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 92.969655 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 11.48414 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1830 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 602 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/12/06 12:31:41 INFO DAGScheduler: ResultStage 1 (count at cassSpark.scala:69) finished in 0,605 s
16/12/06 12:31:41 INFO DAGScheduler: Job 0 finished: count at cassSpark.scala:69, took 10,592173 s
time: 19242.858679ms
16/12/06 12:31:48 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/12/06 12:31:48 INFO SparkContext: Invoking stop() from shutdown hook
16/12/06 12:31:48 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector
16/12/06 12:31:48 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/12/06 12:31:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/06 12:31:48 INFO MemoryStore: MemoryStore cleared
16/12/06 12:31:48 INFO BlockManager: BlockManager stopped
16/12/06 12:31:48 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/06 12:31:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/06 12:31:48 INFO SparkContext: Successfully stopped SparkContext
16/12/06 12:31:48 INFO ShutdownHookManager: Shutdown hook called
16/12/06 12:31:48 INFO ShutdownHookManager: Deleting directory C:\Users\superbrainbug\AppData\Local\Temp\spark-17606f05-6bb8-4144-acb5-015f15ea1ea9

Process finished with exit code 0

另外,如果我删除,那么@Yves DARMAILLAC也可以。cache()运行时间为12秒

共有2个答案

穆展鹏
2023-03-14

你为什么要持久化load()流?这是不必要的,因为你没有重用它。时间仍然是相同的wihtout缓存吗?

试试这个:

time{
   val df = sqlContext
       .read
       .format("org.apache.spark.sql.cassandra")
       .options(Map( "table" -> "nth1", "keyspace"->"nth", "cluster" -> Test Cluster"))
       .load()

    df.count()
}
史良哲
2023-03-14

20秒对于你来说,环境看起来比预期的大一点,应该是2秒左右。你能在这里启用信息级别的火花记录并发布结果吗

您的示例中的主要时间消费者是初始化,第二次运行将快得多

>

  • Spark是懒惰的框架,所以在第一次查询时初始化很多。你使用“本地”主,所以当你运行在火花集群上时会更慢。

    连接器针对批量查询进行了优化,因此在它做了很多准备工作,以更快地运行批量查询:查询具有表大小和令牌范围位置的C*群集元信息,创建拆分,建立C*连接。

  •  类似资料:
    • 在你否决之前,我想声明,我看了所有类似的问题,但我仍然得到可怕的“主键列不能被限制”错误。 下面是我的表结构: 谢谢,德尼兹

    • 我使用的是datastax提供的spark-cassandra-connector 1.1.0。我注意到了interining问题,我不知道为什么会发生这样的事情:当我广播cassandra connector并试图在执行程序上使用它时,我重复了异常,这表明我的配置无效,无法在0.0.0连接到cassandra。 示例StackTrace:

    • Maven中央存储库(Spark-Cassandra-Connector-Java2.11) 那么,在本地运行Spark和Cassandra之后,如何创建keyspace、表和插入行呢?

    • 我正在尝试评估Cassandra DB在存储和检索不同通道的时间序列数据方面的性能。 数据以文件格式记录,最大记录速率为8个样本/秒,每个样本都有一个以毫秒为单位的时间戳。给定时间记录的通道数可能会有所不同。 受以下链接的启发,我使用时间序列数据建模入门创建了以下表: 创建表uhhdata ( ch_idx int,date timestamp,dt timestamp,val float,PRI

    • 我在cassandra DB上构建了结构来存储操作系统数据的时间序列数据,如服务、进程和其他信息。为了理解如何使用Cassandra来存储JSON数据并通过条件的CQL查询检索数据,我倾向于简化模型。因为在整个模型数据库中,我将拥有比report_object更复杂的类型,如hashMap数组的hashMap,例如:Type

    • 我正在用SparkMaster api 7077执行JettyRun和ClusterMode。我将cassandra驱动程序和spark-cassandra连接器的jar传递给spark conf(setjar) 有些时候,如果我重新启动,它是有效的,但有几次,我不得不尝试和尝试,从来没有工作。 我尝试了一些答案,比如将Spark番石榴罐子重命名为19版本,但总是遇到同样的问题。 怎么回事?