当前位置: 首页 > 知识库问答 >
问题:

为什么重新分区()方法会增加磁盘上的文件大小?

施飞雨
2023-03-14

我正在使用的一个数据湖(df)有2 TB的数据和20000个文件。我想把数据集压缩成2000 GB的文件。

如果运行df。coalesce(2000)并写入磁盘,数据湖包含1.9 TB的数据。

如果运行df。重新分区(2000)并写入磁盘,数据湖包含2.6 TB的数据。

数据湖中的每个文件比预期的大0.3 GB(它们都是1.3 GB的文件,而不是1 GB的文件)。

为什么repartition()方法会增加整个数据湖的大小?

还有一个相关的问题讨论了为什么在运行聚合后数据湖的大小会增加。答案是:

一般来说,当涉及到数据分布(数据组织)和单个列的基数时,像Parquet这样的列存储格式非常敏感。数据越有组织,基数越低,存储效率就越高。

coalesce()算法是否提供了更有组织的数据...我不这么认为...

我不认为另一个问题回答了我的问题。

共有2个答案

汪信鸥
2023-03-14

我同意@10465355的回答,这里我有一个极端的例子。

有一个名为table_a的表,它的所有列都是字符串。其存储格式为Orc,由

insert overwrite table table_a
select a,b,...,i
from table_other
group by a,b,...,i

HashAggregate操作之后,表_a中的数据组织得足够好。尤其是第一列a。orc文件的大小为6.97MB。(事实上,有一个2.09 KB的小文件,我后来忽略了它。)

然后,我们重新分区table_a。

val querydf = spark.sql("""select *
    from table_a distribute by rand()""").repartition(1)

querydf.createOrReplaceTempView("tmpTable")

spark.sql("""insert overwrite table table_a 
select a,b,...,i
from tmpTable""")

Numsubtions=1时,Random(hashing.byteswap32(index)). nextInt(NumPARtions)不会触发随机重新分配,因此我们添加了rand()分配的等价于重新分区(n),得到一个大小为14.26 MB的文件。

我们可以使用hive--orcfiledump来获取orc文件的文件结构。

重新分区之前

Stripes:
  Stripe: offset: 3 data: 7288854 rows: 668265 tail: 354 index: 13637
    Stream: column 0 section ROW_INDEX start: 3 length 50
    Stream: column 1 section ROW_INDEX start: 53 length 1706
    Stream: column 2 section ROW_INDEX start: 1759 length 672
    Stream: column 3 section ROW_INDEX start: 2431 length 2297
    Stream: column 4 section ROW_INDEX start: 4728 length 1638
    Stream: column 5 section ROW_INDEX start: 6366 length 1270
    Stream: column 6 section ROW_INDEX start: 7636 length 1887
    Stream: column 7 section ROW_INDEX start: 9523 length 1823
    Stream: column 8 section ROW_INDEX start: 11346 length 1120
    Stream: column 9 section ROW_INDEX start: 12466 length 1174
    Stream: column 1 section DATA start: 13640 length 209662
    Stream: column 1 section LENGTH start: 223302 length 1158
    Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
    Stream: column 2 section DATA start: 455788 length 29861
    Stream: column 2 section LENGTH start: 485649 length 5
    Stream: column 2 section DICTIONARY_DATA start: 485654 length 33
    Stream: column 3 section DATA start: 485687 length 424936
    Stream: column 3 section LENGTH start: 910623 length 4069
    Stream: column 3 section DICTIONARY_DATA start: 914692 length 41298
    Stream: column 4 section DATA start: 955990 length 443602
    Stream: column 4 section LENGTH start: 1399592 length 4122
    Stream: column 4 section DICTIONARY_DATA start: 1403714 length 56217
    Stream: column 5 section DATA start: 1459931 length 475983
    Stream: column 5 section LENGTH start: 1935914 length 2650
    Stream: column 5 section DICTIONARY_DATA start: 1938564 length 17798
    Stream: column 6 section DATA start: 1956362 length 480891
    Stream: column 6 section LENGTH start: 2437253 length 4230
    Stream: column 6 section DICTIONARY_DATA start: 2441483 length 27873
    Stream: column 7 section DATA start: 2469356 length 2716359
    Stream: column 7 section LENGTH start: 5185715 length 304679
    Stream: column 8 section DATA start: 5490394 length 438723
    Stream: column 8 section LENGTH start: 5929117 length 58072
    Stream: column 8 section DICTIONARY_DATA start: 5987189 length 424961
    Stream: column 9 section DATA start: 6412150 length 630248
    Stream: column 9 section LENGTH start: 7042398 length 1455
    Stream: column 9 section DICTIONARY_DATA start: 7043853 length 258641
    Encoding column 0: DIRECT
    Encoding column 1: DICTIONARY_V2[48184]
    Encoding column 2: DICTIONARY_V2[3]
    Encoding column 3: DICTIONARY_V2[4252]
    Encoding column 4: DICTIONARY_V2[4398]
    Encoding column 5: DICTIONARY_V2[4404]
    Encoding column 6: DICTIONARY_V2[5553]
    Encoding column 7: DIRECT_V2
    Encoding column 8: DICTIONARY_V2[105667]
    Encoding column 9: DICTIONARY_V2[60943]

重新分配后:

Stripes:
  Stripe: offset: 3 data: 14940022 rows: 668284 tail: 344 index: 12312
    Stream: column 0 section ROW_INDEX start: 3 length 50
    Stream: column 1 section ROW_INDEX start: 53 length 1755
    Stream: column 2 section ROW_INDEX start: 1808 length 678
    Stream: column 3 section ROW_INDEX start: 2486 length 1815
    Stream: column 4 section ROW_INDEX start: 4301 length 1297
    Stream: column 5 section ROW_INDEX start: 5598 length 1217
    Stream: column 6 section ROW_INDEX start: 6815 length 1841
    Stream: column 7 section ROW_INDEX start: 8656 length 1330
    Stream: column 8 section ROW_INDEX start: 9986 length 1289
    Stream: column 9 section ROW_INDEX start: 11275 length 1040
    Stream: column 1 section DATA start: 12315 length 4260547
    Stream: column 1 section LENGTH start: 4272862 length 15955
    Stream: column 2 section DATA start: 4288817 length 102153
    Stream: column 2 section LENGTH start: 4390970 length 5
    Stream: column 2 section DICTIONARY_DATA start: 4390975 length 33
    Stream: column 3 section DATA start: 4391008 length 1033345
    Stream: column 3 section LENGTH start: 5424353 length 4069
    Stream: column 3 section DICTIONARY_DATA start: 5428422 length 41298
    Stream: column 4 section DATA start: 5469720 length 1044769
    Stream: column 4 section LENGTH start: 6514489 length 4122
    Stream: column 4 section DICTIONARY_DATA start: 6518611 length 56217
    Stream: column 5 section DATA start: 6574828 length 1142805
    Stream: column 5 section LENGTH start: 7717633 length 2650
    Stream: column 5 section DICTIONARY_DATA start: 7720283 length 17798
    Stream: column 6 section DATA start: 7738081 length 1147888
    Stream: column 6 section LENGTH start: 8885969 length 4230
    Stream: column 6 section DICTIONARY_DATA start: 8890199 length 27873
    Stream: column 7 section DATA start: 8918072 length 1705640
    Stream: column 7 section LENGTH start: 10623712 length 208184
    Stream: column 7 section DICTIONARY_DATA start: 10831896 length 1525605
    Stream: column 8 section DATA start: 12357501 length 513225
    Stream: column 8 section LENGTH start: 12870726 length 58100
    Stream: column 8 section DICTIONARY_DATA start: 12928826 length 424905
    Stream: column 9 section DATA start: 13353731 length 1338510
    Stream: column 9 section LENGTH start: 14692241 length 1455
    Stream: column 9 section DICTIONARY_DATA start: 14693696 length 258641
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2
    Encoding column 2: DICTIONARY_V2[3]
    Encoding column 3: DICTIONARY_V2[4252]
    Encoding column 4: DICTIONARY_V2[4398]
    Encoding column 5: DICTIONARY_V2[4404]
    Encoding column 6: DICTIONARY_V2[5553]
    Encoding column 7: DICTIONARY_V2[378283]
    Encoding column 8: DICTIONARY_V2[105678]
    Encoding column 9: DICTIONARY_V2[60943]

Orc同时使用游程编码和字典编码来压缩数据。下面是编码字典_V2的含义。参考:ORCv1
|编码|流类型|可选|内容| | | | | | | | |字典| V2 |当前|是|布尔RLE | |数据|否|无符号整数RLE V2 | | | |字典|数据|否|字符串内容|长度|无符号整数||

在字典编码中,如果值是[“内华达州”、“加利福尼亚州”、“内华达州”、“加利福尼亚州”和“佛罗里达州”];字典的数据是“CaliforniaFloridaNevada”,长度是[10,7,6]。数据将是[2,0,2,0,1]。

无符号整数RLE v2也是在REF: ORCv1

在Hive 0.12中,ORC引入了运行长度编码版本2(RLEv2),该版本改进了压缩和固定比特宽度编码,以实现更快的扩展。RLEv2基于数据使用四个子编码:

  • 短重复-用于具有重复值的短序列
  • Direct-用于具有固定位宽的随机序列
  • 补丁基地-用于具有可变位宽的随机序列
  • Δ-用于单调增加或减少序列

让我们关注第一个专栏。

# before repartition
Stream: column 1 section DATA start: 13640 length 209662
Stream: column 1 section LENGTH start: 223302 length 1158
Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
Encoding column 1: DICTIONARY_V2[48184]

# after repartition
Stream: column 1 section DATA start: 12315 length 4260547
Stream: column 1 section LENGTH start: 4272862 length 15955
Encoding column 1: DIRECT_V2

虽然我不知道Orc是如何选择编码的,但Orc认为在随机化后对第1列使用DIRECT_V2比使用DICTIONARY_V2节省更多空间。事实上,重新分区后,空间会变大近10倍。(4260547 15955)/(209662 1158 231328)

其他列的大多数编码没有改变,但大小有所增加。

重新分区VS合并
前者的文件大小是统一的,以避免数据倾斜
前者的数据量变大
*(潜在)*过滤混沌数据时不能使用ORC的行组索引。加入时,双方都需要重新洗牌。我使用上述数据来测试洗牌和排序之间的时间没有显著差异。

涂溪叠
2023-03-14

免责声明:

这个答案主要包含推测。对这种现象的详细解释可能需要对输入和输出(或者至少它们各自的元数据)进行深入分析

意见:

>

  • 熵有效地限制了最强的无损压缩性能——维基百科——熵(信息论)
  • 持久列格式和内部Spark SQL表示都透明地应用了不同的压缩技术(如运行长度编码或字典编码),以减少存储数据的内存占用。

    此外,磁盘上的格式(包括纯文本数据)可以使用通用压缩算法进行显式压缩——目前尚不清楚这里是否存在这种情况。

    压缩(显式或透明)应用于数据块(通常是分区,但可以使用较小的单元)。

    根据1)、2)和3)我们可以假设平均压缩率将取决于集群中数据的分布。我们还应该注意,如果上游谱系包含广泛的转换,最终结果可能是不确定的。

    合并重新划分的可能影响:

    通常,合并可以采取两种途径:

    • 通过管道升级到源头-这是最常见的场景。
    • 传播到最近的洗牌。

    在第一种情况下,我们可以预期压缩率将与输入的压缩率相当。然而,在某些情况下,最终产量可能会小得多。让我们想象一个退化的数据集:

    val df = sc.parallelize(
      Seq("foo", "foo", "foo", "bar", "bar", "bar"),
      6 
    ).toDF
    

    如果像这样的数据集被写入磁盘,就不会有压缩的可能——每个值都必须按原样写入:

    df.withColumn("pid", spark_partition_id).show
    
    +-----+---+
    |value|pid|
    +-----+---+
    |  foo|  0|
    |  foo|  1|
    |  foo|  2|
    |  bar|  3|
    |  bar|  4|
    |  bar|  5|
    +-----+---+
    

    换句话说,我们需要大约6*3个字节,总共18个字节。

    但是如果我们联合起来

    df.coalesce(2).withColumn("pid", spark_partition_id).show
    
    +-----+---+
    |value|pid|
    +-----+---+
    |  foo|  0|
    |  foo|  0|
    |  foo|  0|
    |  bar|  1|
    |  bar|  1|
    |  bar|  1|
    +-----+---+
    

    例如,我们可以使用带有小int的RLE作为计数,并将每个分区存储3个1字节,总共8个字节。

    这当然是一个巨大的过度简化,但说明了如何保持低熵输入结构,合并块可以降低内存占用。

    第二种合并方案不太明显,但也有一些方案可以通过上游过程降低熵(例如,考虑窗口函数),保留这种结构将是有益的。

    那么重新分区呢?

    如果不分区表达式重新分区适用于RoundRobin分区(实现为基于分区id的伪随机键的Hash分区)。只要哈希函数的行为合理,这种重新分配应该最大化数据的熵,从而降低可能的压缩率。

    结论:

    coalesce不应该单独提供任何特定的好处,而是可以保留数据分布的现有属性-该属性在某些情况下可能是有利的。

    重新划分,由于其性质,平均而言会让事情变得更糟,除非数据的熵已经最大化(这种情况下情况可能会改善,但在非平凡数据集上则极不可能)。

    最后,repartition使用分区表达式或repartitionByRange应该减少熵,并提高压缩率。

    注:

    我们还应该记住,列格式通常根据运行时统计信息决定特定的压缩/编码方法(或不确定)。因此,即使特定块中的行集合是固定的,但行的顺序发生了变化,我们也可以观察到不同的结果。

  •  类似资料:
    • 我正在尝试合并hdfs目录中小于512 mb的小文件。合并后,磁盘上的文件大小超过输入大小。有什么方法可以高效的控制大小。 重新分区导致了大量的混乱,输入文件是拼花格式的。

    • 本文向大家介绍Vmware添加磁盘的方法:扩展磁盘,包括了Vmware添加磁盘的方法:扩展磁盘的使用技巧和注意事项,需要的朋友参考一下 这篇文章介绍了一下如何在Vmare下添加或扩展磁盘并使之有效。 场景 创建Linux时分配磁盘空间随着使用的增加,使用率逐渐升高,此时需要在添加或者扩展一下磁盘。 比如:此Linux(CentOS 7.3)的磁盘为20G,目前已经使用到接近80% 扩展磁盘或者添加

    • 所以问题是在主题中。我认为我没有正确理解重新分区的工作。在我的脑海中,当我说时,我希望所有数据都将在工作人员(假设60个工作人员)之间按相等的大小进行分区。 举个例子。我会在不平衡的文件中加载大量数据,比如400个文件,其中20%的文件大小为2Gb,其他80%的文件大小约为1Mb。我有加载此数据的代码: 然后,我希望将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按

    • 2.2 磁盘分区 这一章在规划的重点是为了要安装Linux,那Linux系统是安装在计算机元件的那个部分呢?就是磁盘啦!所以我们当然要来认识一下磁盘先。 我们知道一块磁盘是可以被分区成多个分区的(partition),以旧有的Windows观点来看,你可能会有一颗磁盘并且将他分区成为C:, D:, E:盘对吧!那个C, D, E就是分区(partition)啰。但是Linux的设备都是以文件的型态

    • 问题内容: 我不明白为什么 ‘chown’ 命令应该增加我的docker映像的大小? 以下Dockerfile创建大小为5.3MB的映像: 但是,此示例创建的图像大小为8.7MB: 为什么? 注意: 我的实际dockerfile当然比该示例长得多,因此映像大小的增加也很大。这就是为什么我什至在乎。 问题答案: Dockerfile中的每个步骤都会生成一个新的中间映像或“层”,该文件由文件系统中与上

    • null 如何在磁盘上区分它们?(以后与TensorFlowJS转换器一起使用) 每个模型是如何创建的?