当前位置: 首页 > 知识库问答 >
问题:

规范化的SPARK RDD分区,使用减法或重新分区

壤驷高洁
2023-03-14

使用Spark 2.4.0。我的生产数据非常歪斜,因此其中一项任务的时间是其他任务的7倍。我尝试了不同的策略来规范数据,以便所有执行者都能平等工作-

  1. spark.default。并行性
  2. reduceByKey(numPartitions)
  3. 重新分区(numPartitions)

我的期望是这三个选项应该均匀分区,但是在Spark Local/Standalone上使用一些虚拟的非生产数据表明,选项1、2比3更正常。

数据如下:

account}date}ccy}amount
A1}2020/01/20}USD}100.12
A2}2010/01/20}SGD}200.24
A2}2010/01/20}USD}300.36
A1}2020/01/20}USD}400.12

预期结果应该是[A1-USD,500.24],[A2-SGD,200.24],[A2-USD,300.36]理想情况下,这些应该划分为3个不同的分区。

javaRDDWithoutHeader
.mapToPair((PairFunction<Balance, String, Integer>) balance -> new Tuple2<>(balance.getAccount() + balance.getCcy(), 1))        
    .mapToPair(new MyPairFunction())
   .reduceByKey(new ReductionFunction())

检查分区的代码

     System.out.println("b4 = " +pairRDD.getNumPartitions());
     System.out.println(pairRDD.glom().collect());
     JavaPairRDD<DummyString, BigDecimal> newPairRDD = pairRDD.repartition(3);
     System.out.println("Number of partitions = " +newPairRDD.getNumPartitions());
     System.out.println(newPairRDD.glom().collect());

>

  • 选项1:什么都不做
  • 选项2:将spark.default.parallelism设置为3
  • 选项3:使用Num分区=3的减除ByKey
  • 选项4:重新分区(3)

    对于选项1分区数=2[[[(DummyString{帐户='A2', ccy='SGD'},200.24),(DumMyString{帐户='A2', ccy='USD'},300.36)],[(DumMyString{帐户='A1', ccy='USD'},500.24)]]

    对于备选方案2

    分区数 = 3 [ [(DummyString{account='A1', ccy='USD'},500.24)], [(DummyString{account='A2', ccy='USD'},300.36)], [(DummyString{account='A2', ccy='SGD'},200.24)]]

    对于选项 3 分区数 = 3 [ [(DummyString{account='A1', ccy='USD'},500.24)], [(DummyString{account='A2', ccy='USD'},300.36)], [(DummyString{account='A2', ccy='SGD'},200.24)]

    对于选项 4 分区数 = 3 [[], [(DummyString{ account='A2', ccy='SGD'},200.24)], [(DummyString{ account='A2', ccy='USD'},300.36), (DummyString{ account='A1', ccy='USD'},500.24)]]

    结论:选项 2(spark.default.parallelism) 和 3(reduceByKey(numPartitions) 规范化比选项 4(重新分区)好得多 相当确定的结果,从未看到选项4规范化为 3 个分区。

    问题:

    1. 是否比重新分区或
    2. 这只是因为样本数据集太小了吗?或者
    3. 当我们通过YARN集群提交时,这种行为会有所不同吗
  • 共有1个答案

    南门向荣
    2023-03-14

    我认为这个问题中有一些东西贯穿始终,因此更难回答。

    首先,存在与静态数据相关的分区和并行性,因此在读取时也是如此;在不让海洋重新沸腾的情况下,这里有一个极好的SO答案来解决这个问题:当spark的主内存无法容纳文件时,spark如何读取一个大文件(PB)。在任何情况下,都没有散列或任何正在进行的事情,只是“原样”。

    此外,与DFs相比,rdd没有得到很好的优化。

    Spark中的各种操作会在一个动作被调用后导致洗牌:

    • reduceByKey将使用散列进行最终聚合和本地分区聚合,这将导致更少的混洗
    • 重新分区,使用随机性
    • partitionBy(new HashPartitioner(n))等,您没有提到
    • reduceByKey(聚合函数,N个分区),奇怪的是,它比重新分区更有效

    您的后一条评论通常暗示了数据的偏差。对于reduceByKey,太多条目哈希到同一个“桶”/分区。缓解方法:

    >

  • 一般来说,尝试预先使用大量分区(读入时) - 但我在这里看不到您的转换和方法,因此我们将其作为一般建议。

    一般来说,尝试使用合适的哈希在前面(读入时)使用更多的分区,但我看不到这里的转换和方法,所以我们将此作为一般建议。

    或者在某些情况下,通过添加后缀来“加盐”密钥,然后 reduceByKey 并再次将 reduceByKey “取消盐化”以获取原始密钥。取决于所花费的额外时间与保持原样或执行其他选项。

    repartition(n) 应用随机排序,因此您洗牌,然后需要再次洗牌。不必要的海事组织。正如另一篇文章所示(请参阅对您的问题的评论),它看起来像是不必要的工作,但这些都是旧式的RDD。

    顺便说一句,使用数据帧更容易做到。

    由于我们不了解您的完整编码,希望对您有所帮助。

  •  类似资料:
    • 我有一个非规范化用例——一个hiveavro事实表与14个较小的维度表连接,生成一个非规格化拼花输出表。输入事实表和输出表都以相同的方式进行分区(Category=TEST1,YearMonthId=202101)。我确实运行历史处理,这意味着一次处理并加载给定类别的几个月。 我使用的是Spark 2.4.0/pyspark数据帧,所有表连接的广播连接,动态分区插入,最后使用colasce来控制输

    • 我试图使用JPA规范编写以下查询。从hcp中选择Distincent name,其中区域='Dhaka'; hcp实体如下所示 桌子应该是这样的 所需的结果将根据区域列出不同的名称。如何使用JPA规范在特定字段上应用distinct? 所需的查询和输出:

    • 我不确定在进行聚合操作时应该增加还是减少分区数量。假设我正在使用pyspark数据框架。。 我知道行转换通常需要更多的分区。而将数据保存到磁盘通常需要fewere分区。 但是,对于聚合,我不清楚在中做什么?? 增加分区数的参数:由于我们必须为聚合而洗牌数据,因此您希望洗牌更少的数据,从而增加分区数,以减小分区的大小。 减少分区数量的论点:IT需要大量开销来收集和计算每个分区。因此,太多的分区将导致

    • 我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html 它说: 重新分区:生成的DataFrame是哈希分区的 对于repartitionByRange:结果DataFrame是范围分区的 而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么

    • 步骤3我通过for循环加载每个分区,执行聚合,并以追加模式将其保存为文件夹,这样我就有9个模块作为文件夹:、等。它们不按模块分区,只是保存为文件夹。由于我的默认spark numpartitions是,每个模块文件夹都有文件,因此总共有文件 步骤4到目前为止还不错,但是我需要按把它分区回来。因此,我循环遍历每个分区,并将文件保存为一个没有任何分区的parquet文件。这导致总共有文件。我不知道这是

    • 根据Spark 1.6.3的文档,应该保留结果数据表中的分区数: 返回由给定分区表达式分区的新DataFrame,保留现有的分区数 Edit:这个问题并不涉及在Apache Spark中删除空DataFrame分区的问题(例如,如何在不产生空分区的情况下沿列重新分区),而是为什么文档所说的内容与我在示例中观察到的内容不同