当前位置: 首页 > 知识库问答 >
问题:

如何优化火花sql并行运行

融宏伟
2023-03-14

我是spark新手,有一个简单的spark应用程序,使用spark SQL/hiveContext:

  1. 从hive表中选择数据(10亿行)
  2. 做一些过滤,聚合,包括row_number窗口函数来选择第一行,分组,计数()和最大()等。
  3. 将结果写入HBase(数亿行)

我提交的作业运行它在纱线集群(100个执行者),它很慢,当我在火花UI中查看DAG可视化时,似乎只有蜂巢表扫描任务并行运行,其余的步骤#2和#3只运行在一个实例中,可能应该能够优化并行化?

应用程序如下所示:

第一步:

val input = hiveContext
  .sql(
     SELECT   
           user_id  
           , address  
           , age  
           , phone_number  
           , first_name  
           , last_name  
           , server_ts   
       FROM  
       (     
           SELECT  
               user_id  
               , address  
               , age  
               , phone_number  
               , first_name  
               , last_name  
               , server_ts   
               , row_number() over 
                (partition by user_id, address,  phone_number, first_name, last_name  order by user_id, address, phone_number, first_name, last_name,  server_ts desc, age) AS rn  
           FROM  
           (  
               SELECT  
                   user_id  
                   , address  
                   , age  
                   , phone_number  
                   , first_name  
                   , last_name  
                   , server_ts  
               FROM  
                   table   
               WHERE  
                   phone_number <> '911' AND   
                   server_date >= '2015-12-01' and server_date < '2016-01-01' AND  
                   user_id IS NOT NULL AND  
                   first_name IS NOT NULL AND  
                   last_name IS NOT NULL AND  
                   address IS NOT NULL AND  
                   phone_number IS NOT NULL AND  
           ) all_rows  
       ) all_rows_with_row_number  
       WHERE rn = 1)

val input_tbl = input.registerTempTable(input_tbl)

第二步:

val result = hiveContext.sql(
  SELECT state, 
         phone_number, 
         address, 
         COUNT(*) as hash_count, 
         MAX(server_ts) as latest_ts 
     FROM  
    ( SELECT  
         udf_getState(address) as state  
         , user_id  
         , address  
         , age  
         , phone_number  
         , first_name  
         , last_name  
         , server_ts  
     FROM  
         input_tbl ) input  
     WHERE state IS NOT NULL AND state != ''  
     GROUP BY state, phone_number, address)

第三步:

result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)

如您所见,阶段0中的“过滤器”、“项目”和“交换”仅在一个实例中运行,阶段1和阶段2也是如此,因此,如果问题是愚蠢的,请回答几个问题并道歉:

  1. 过滤器,项目和交换是否发生在驱动程序中,从每个执行程序的数据洗牌后?
  2. 什么代码映射到过滤器,项目和交换?
  3. 如何并行运行过滤器、项目和交换来优化性能?
  4. 是否有可能并行运行第一阶段和第二阶段?

共有2个答案

张丁雷
2023-03-14

这是不明显的,所以我会做以下事情,以零的问题。

  1. 计算每个步骤的执行时间

希望你能把问题归零。

终子昂
2023-03-14

您没有正确读取DAG图——每个步骤都是使用单个框可视化的事实并不意味着它没有使用多个任务(因此也就是核心)来计算该步骤。

您可以通过向下钻取阶段视图来查看每个步骤使用了多少任务,该阶段视图显示了该阶段的所有任务。

例如,这里有一个与您类似的DAG可视化示例

您可以看到每个阶段都由“单个”步骤列描绘。

但如果我们看下表,我们可以看到每个阶段的任务数量:

其中一个只使用2个任务,而另一个使用220,这意味着数据被分割成220个分区,并且在提供足够可用资源的情况下并行处理分区。

如果你深入到这个阶段,你会再次看到它使用了220个任务和所有任务的细节。

只有从磁盘读取数据的任务才会在图中显示为具有这些“多个点”,以帮助您了解读取了多少文件。

正如拉希德的回答所建议的,检查每个阶段的任务数量。

 类似资料:
  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 本文向大家介绍如何进行SQL优化?相关面试题,主要包含被问及如何进行SQL优化?时的应答技巧和注意事项,需要的朋友参考一下 (1)选择正确的存储引擎 MyISAM 适合于一些需要大量查询的应用,但其对于有大量写操作并不是很好。甚至你只是需要update一个字段,整个表都会被锁起来,而别的进程,就算是读进程都无法操作直到读操作完成。另外,MyISAM 对于 SELECT COUNT(*) 这类的计算

  • 我们正在使用最新的Spark构建。我们有一个非常大的元组列表(8亿)作为输入。我们使用具有主节点和多个工作节点的docker容器运行Pyspark程序。驱动程序用于运行程序并连接到主机。 运行程序时,在sc.parallelize(tuplelist)行,程序要么退出并显示java堆错误消息,要么完全退出而不出错。我们不使用任何Hadoop HDFS层,也不使用纱线。 到目前为止,我们已经考虑了这

  • 在PySpark中或者至少在Scala中,Apache Spark中是否有与Pandas Melt函数等价的函数? 到目前为止,我一直在用Python运行一个示例数据集,现在我想对整个数据集使用Spark。

  • 我有一门课: 它运行得很好,但抛出了一个例外:在我对RDD的映射做了一个小更改之后: 我以为这两个功能应该是一样的,但似乎不是。为什么它们不同?

  • 我已经在一个15节点的Hadoop集群上安装了。所有节点都运行和最新版本的Hadoop。Hadoop集群本身是功能性的,例如,YARN可以成功地运行各种MapReduce作业。 我可以使用以下命令在节点上本地运行Spark Shell,而不会出现任何问题:。 你知道为什么我不能用客户端模式在纱线上运行Spark Shell吗?