当前位置: 首页 > 知识库问答 >
问题:

火花数据帧按平均值和中值分组不完整

尚楚
2023-03-14

我使用Spark sql dataframes执行groupby操作,然后计算每组数据的平均值和中值。原始数据量约为1 TB。

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
        count("Error").as("Count"), 
        avg("Error").as("MeanError"), 
        callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"), 
        callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"), 
        callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
    filter($"Count" > 1000)


df_result.orderBy(asc("MeanError")).limit(5000)
    .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")

当我运行该查询时,我的工作被卡住,无法完成。如何调试该问题?是否存在导致groupby()卡滞的按键不平衡?

共有1个答案

蓬弘
2023-03-14

评论中已经有很多合理的建议,但值得一提的是我的想法:

1) 执行df。计算工时?如果不是,您的问题就出现在您发布的代码之前(如注释中所示)

2) 查看Spark UI(如评论中所建议的)-大多数任务完成得很快,而少数任务需要很长时间/看起来卡住了吗?如果是这样,歪斜很可能是你的问题

3) 您可以重写查询,首先只查找每个“id”的“count”。接下来,过滤原始df,使其仅包含id通过广播(避免df乱序)内部连接出现1000次以上的行(如果出现次数不超过1000次的id太多)。然后聚合这个较小的数据帧并计算所有统计数据。如果计数聚合有效,则输出还应显示是否存在任何显著的数据倾斜!

4)有时将计算分解为更小的步骤,然后写入,然后立即从磁盘读取,这在过去帮助我完成了尴尬的工作。如果首先生成df成本很高,也可以使调试更快。

5) 绝对值得点燃火花。sql。洗牌分区(如注释所示);2001是spark中的一个神奇数字(spark.sql.shuffle.partitions的最佳值应该是多少,或者在使用spark sql时我们如何增加分区?)

6)我也会尝试改变数据量,如果你只使用一周中的一天=1,它是否有效(如评论中建议的)

7)查询是否在没有percentile_approx的情况下运行?

 类似资料:
  • 我试图计算数据中几列(第一列除外)的平均值和标准差。具有<code>NA<code>值的帧。 我试过< code>colMeans、< code > sappy 等。,创建一个循环,遍历data.frame,然后将平均值和标准偏差存储在一个单独的表中,但不断得到一个“有趣的”错误。任何帮助都是巨大的。谢谢 一个

  • 示例数据: 我想计算每个唯一列名称的平均值和标准偏差(忽略NA)以获得如下输出: 可再现数据: 我得到的最接近的是 这是我从这篇文章中得到的,但我不知道如何调整它来得到我想要的。我知道我可以取rowmeans的平均值来得到每组的平均值,但这不适用于标准差。

  • 这个问题与这个主题有关: Spark 2.2 Scala 数据帧从字符串数组中选择,捕获错误 我需要区分缺少列的记录(这在我的用例中不是错误)和具有不适用于列类型的垃圾值的记录。 在执行selectExpr之后,这两种情况在结果数据帧中都显示为null。我正在寻找一种快速的方法,将缺少列的记录包含在好的结果中,同时将具有垃圾值的记录放入坏桶中。不好的可能包括像一个值为空字符串的int字段,或者“a

  • 我有列。 如何根据值将其拆分为2? 第一个将包含

  • [新加入Spark]语言-Scala 根据文档,RangePartitioner对元素进行排序并将其划分为块,然后将块分发到不同的机器。下面的例子说明了它是如何工作的。 假设我们有一个数据框,有两列,一列(比如“a”)的连续值从1到1000。还有另一个数据帧具有相同的模式,但对应的列只有4个值30、250、500、900。(可以是任意值,从1到1000中随机选择) 如果我使用RangePartit

  • 我处理了像这样存储的双精度列表: 我想计算这个列表的平均值。根据文档,: MLlib的所有方法都使用Java友好类型,因此您可以像在Scala中一样导入和调用它们。唯一的警告是,这些方法采用Scala RDD对象,而Spark Java API使用单独的JavaRDD类。您可以通过对JavaRDD对象调用.RDD()将JavaRDD转换为Scala RDD。 在同一页面上,我看到以下代码: 根据我