当前位置: 首页 > 知识库问答 >
问题:

执行pyspark.sql.dataframe.take超过一个小时(4)

金承嗣
2023-03-14

我在3个VM上运行Spark1.6(即1x主服务器;2x从服务器),它们都有4个内核和16GB RAM。

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load()
four = df.take(4)
16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1)
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List()
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List()
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB)
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1)
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes)
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB)
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1)
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s

正如你所看到的,这需要很长的时间。我的表实际上相当大(存储了大约2.2亿行,每行11个字段),但是这样的查询可以立即使用“普通”sql(例如pyodbc)执行。

我想我没有理解/没有使用Spark,你会有这样的想法或建议来让它更好地工作吗?

共有1个答案

公良琛
2023-03-14

虽然Spark支持JDBC上的有限谓词下推,但所有其他操作,如limit、group、聚合都是在内部执行的。不幸的是,这意味着take(4)将首先获取数据,然后应用limit。换句话说,您的数据库将执行(假设没有投影和过滤器)以下内容:

SELECT * FROM table 

其余的将由Spark处理。其中涉及到一些优化(特别是Spark迭代地评估分区以获得limit请求的记录数),但与数据库端优化相比,它仍然非常低效。

如果要将limit推送到数据库,则必须静态地使用subquery作为dbtable参数:

(sqlContext.read.format('jdbc')
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....))
sqlContext.read.format("jdbc").options(Map(
  "url"     -> "xxxx",
  "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp",
))

一旦数据源API v2就绪,这种行为可能会在将来得到改进:

  • SPARK-15689
  • SPIP:数据源API v2
 类似资料:
  • 问题内容: 我正在使用goroutines /频道。这是我的代码。为什么超时情况没有得到执行? 问题答案: 您的超时不会发生,因为您的goroutine 之一会每1.5秒(或大约1.5秒)重复发送一次值在您的频道上,并且只有在2秒钟内没有接收到任何值时才会发生超时。 一旦从接收到一个值,在下一次迭代中将再次执行一个 新的 调用,该调用将返回一个 新的 通道,在该通道上将仅在另外2秒钟后发送一个值。

  • 问题内容: 我正在一个数据库很大的网站上工作。当时表中有1百万条记录。当我执行查询时,这将花费太多时间来执行。以下是一个示例查询: 每个查询都需要一分钟以上的时间,但是当我将表放到1万条记录中时,该查询就会快速执行。 正如我所读过的,在一个表中有一百万条记录没有问题,因为在数据库表中没有大记录的问题。 我已经通过堆栈溢出问题在表中使用了ID索引, 如何向MySQL表添加索引? ,但仍然有同样的问题

  • 我的表: 插入查询: 我的页面大小是16KB。因此,我的表中的一行最多可以包含8192字节(即8KB)。 我创建了11个列(每个255个字符),其中这11列最多可以容纳字符。 如果我存储2805-3字节的字符,它将需要(

  • 主要内容:1.组成,2. 大概流程,3.查询缓存,4.解析器 和 预处理器,5.优化器,6.存储引擎,7.总结1.组成 1.客户端 2.服务端:连接器、查询缓存、解析器、预处理器、优化器、执行器等,涵盖 MySQL 的大多数核心服务功能,以及所有的内置函数(如日期、时间、数学和加密函数等)。所有跨存储引擎的功能都在这一层实现,比如存储过程、触发器、视图等 3.存储引擎 2. 大概流程 1.MySQL server 层的 对来自客户端的连接进行验证,包含:1.用户名密码和ssl证书 2.datat

  • 问题内容: 我有一个包含650万条记录的MySQL表。当我尝试从phpMyAdmin访问该表时,我得到: 致命错误:在第1457行的C:\ xampp-new \ phpMyAdmin \ libraries \ display_tbl.lib.php中,超过30秒的最大执行时间。 我只是想查看记录,并且没有做任何可能导致错误的查询。 此问题仅在我的服务器上。而且我的本地计算机所包含的记录不如服务

  • 问题内容: 我一直在开发Android应用程序,我每小时需要执行1个任务。我使用以下代码: 它对我有用,但是我的客户告诉我该任务只能工作1次,而不能工作1个小时。我在哪里弄错了?请告诉我。谢谢。 问题答案: 根据您的代码,ALARM_PERIOD为1000L,作为重复间隔。因此,我怀疑警报会在每1000毫秒内触发一次。 如果您设置每小时的重复间隔,则应为3600000L。并请注意,如果电话重新启动