当前位置: 首页 > 知识库问答 >
问题:

Spark AQE 后随机分区合并无法按预期工作,甚至会使某些分区中的数据倾斜。为什么?

公良高刚
2023-03-14

我在我的spark DF上使用全局排序,当我启用AQE和后洗牌合并时,排序操作后我的分区变得比以前更差。

    "spark.sql.adaptive.enabled" -> "true",
    "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
    "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
    "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"

我的查询,在高层次上,看起来:

.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
    < li >可能导致歪斜的列-

我的第一个假设是,即使范围很小,也会有一个太大的峰值。但是我检查并确认按范围重新分区在记录中给了我很好的分布,但糟糕的是大小。我有近 200 个分区,记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。但是使用 AEQ 并重新分区到 18000 个小范围,最小的分区为 18mib,最大的分区为 1.8Gib。这种状态比没有AEQ要糟糕得多。需要强调的是,我使用的是 Spark UI 中的指标 -

所以我开始调试这个问题,但是AQE在ShufflePartitionsUtil.coalesce分区的输入和输出上没有足够的日志。这就是为什么我重写了我的查询到repartitionByRange.sort的分区。并使用额外的日志进行物理计划优化。我的日志告诉我,我最初的想法是对的。

  • 映射和写入混洗阶段后的输入分区被拆分为足够小
  • Coalesce算法将它们收集到一个正确的数字,并在字节分区中很好地分布。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435

而且

Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition  maxsize :312832323
Output partition min size :103832323

最小大小是如此不同,因为最后一个分区的大小,这是预期的。TRACE日志级别显示99%的分区接近290mib。

>

  • 但是为什么火花UI显示如此不同的结果?

    spark UI可能会出错吗?-

    也许吧,但除了任务大小,任务的持续时间也太大了,这让我觉得 Spark UI 还可以。

    因此,我的假设是,问题出在我的阶段的<code>MapOutputStatistics</code>。但它总是坏的还是只在我的情况下-

    只有在我的情况下?-

      < li >我从s3中读取了相同的数据集(块大小为120mb的拼花文件)
      < li >我从Kafka读取了相同的数据集,但是从分区函数中排除了倾斜的列-
    • 我尝试禁用缓存-
    • 我尝试禁用AQE并将18000个分区写入s3-

    所有这些检查使我认为< code>MapOutputStatistics只对我的情况是错误的。可能是如何与Kafka源相关的问题,或者是我的Kafka输入数据分布非常不均匀的问题。

    问题:

    • 那么有人知道我做错了什么吗?
    • 我可以对输入数据做些什么来使洗牌后的合并在我的情况下起作用?
    • 如果你认为我是对的,请发表评论。

    附言我还想提一下,我输入的 Kafka 数据帧是 2160,甚至不是分布式分区 -

  • 共有1个答案

    方高丽
    2023-03-14

    https://www.mail-archive.com/dev@spark.apache.org/msg26851.html

    下面是答案。

    在缓存数据中启用 AQE 的最坏情况不是失去使用/重用缓存的机会,而是如果输出分区碰巧在没有 AQE 的情况下匹配并且在 AQE 之后不匹配,则只是额外的随机播放。这种情况发生的可能性相当低。

     类似资料:
    • 我有一个2.5 GB的数据帧。分区数为5000。我正在尝试重新分区,然后将其持久化。但是在我读取持久化数据之后,分区的数量正在改变。 我甚至尝试使用coalesce,但没有运气。有人能解释一下发生了什么吗?

    • 场景: Kafka- 每个火花流微批次中的逻辑(30秒):< br >读取Json- 我的流媒体工作是阅读大约1000个Kafka主题,大约有10K个Kafkapartitions,吞吐量大约为500万事件/秒。 问题来自 Kafka 分区之间的流量负载不均匀,一些分区的吞吐量大约是较小分区的 50 倍,这会导致 RDD 分区倾斜(因为 KafkaUtils 创建了从 Kafka 分区到 Spar

    • 问题概要:假设我有300 GB的数据正在AWS中的EMR集群上用火花处理。这些数据有三个属性,用于在Hive中使用的文件系统上进行分区:日期、小时和(比方说)另一个。我想以最小化写入文件数量的方式将此数据写入fs。 我现在正在做的是获取日期、小时、另一个时间的不同组合,以及有多少行构成组合的计数。我将它们收集到驱动程序上的列表中,并遍历列表,为每个组合构建一个新的DataFrame,使用行数重新分

    • 我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100

    • 问题内容: 为什么下面的查询计划中包含表“ events_201504”?根据我的查询和对该表的检查约束,我希望查询计划程序能够完全修剪它: 时间和配置: 查询计划: 问题答案: 您的专栏是。 但是回报。该表达式被强制转换为,它带来 两个问题 : 1.) 您没有要求这个,但是表达是不可靠的。其结果取决于在其中执行查询的会话的当前时区设置。 为了使表达清晰,可以使用: 或者只是(在此处阅读手册):

    • 问题内容: 谁能解释两个我为什么这些代码不输出相同的结果(两个代码之间的唯一区别是run()方法)? 注意:第一个代码似乎没有做任何锁定! 第一个代码: 第二个代码: 该代码完全按照预期工作 问题答案: 第一码 事实是,您有3个线程实例,每个线程运行它自己的method实例。但是总是只有一个线程想要与其自己的方法同步,因此它将在线程希望其运行时运行。这根本没有同步。 第二码 您也有3个线程实例,但