当前位置: 首页 > 知识库问答 >
问题:

镶木地板文件大小,消防软管与火花

宋弘壮
2023-03-14

我通过两种方法生成拼花地板文件:动弹消防软管和火花作业。它们都被写入S3上相同的分区结构中。两组数据都可以使用相同的Athena表定义进行查询。两者都使用gzip压缩。

然而,我注意到Spark生成的拼花地板文件大约是Firehose生成的拼花地板文件的3倍大。有什么理由会这样吗?在使用Pyarrow加载模式和元数据时,我确实注意到了一些差异:

>>> import pyarrow.parquet as pq
>>> spark = pq.ParquetFile('<spark object name>.gz.parquet')
>>> spark.metadata
<pyarrow._parquet.FileMetaData object at 0x101f2bf98>
  created_by: parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828)
  num_columns: 4
  num_rows: 11
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1558
>>> spark.schema
<pyarrow._parquet.ParquetSchema object at 0x101f2f438>
uri: BYTE_ARRAY UTF8
dfpts.list.element: BYTE_ARRAY UTF8
udids.list.element: BYTE_ARRAY UTF8
uuids.list.element: BYTE_ARRAY UTF8

>>> firehose = pq.ParquetFile('<firehose object name>.parquet')
>>> firehose.metadata
<pyarrow._parquet.FileMetaData object at 0x10fc63458>
  created_by: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
  num_columns: 4
  num_rows: 156
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1017
>>> firehose.schema
<pyarrow._parquet.ParquetSchema object at 0x10fc5e7b8>
udids.bag.array_element: BYTE_ARRAY UTF8
dfpts.bag.array_element: BYTE_ARRAY UTF8
uuids.bag.array_element: BYTE_ARRAY UTF8
uri: BYTE_ARRAY UTF8

模式差异可能是罪魁祸首吗?还有别的原因吗?

这两个特定文件不包含完全相同的数据,但根据我的Athena查询,Firehose文件中所有行的所有列表的总基数大约是Spark文件中的2.5倍。

编辑以添加:

我写了以下内容,基本上将每个拼花文件的内容转储到标准输出每行一行:

import sys
import pyarrow.parquet as pq

table = pq.read_table(sys.argv[1])
pydict = table.to_pydict()
for i in range(0, table.num_rows):
    print(f"{pydict['uri'][i]}, {pydict['dfpts'][i]}, {pydict['udids'][i]}, {pydict['uuids'][i]}")

然后,我对每个拼花地板文件运行它,并将输出传输到一个文件。以下是原始两个文件的大小、将上述python代码指向每个文件的输出,以及该输出的gzip版本:

-rw-r--r--  1 myuser  staff  1306337 Jun 28 16:19 firehose.parquet
-rw-r--r--  1 myuser  staff  8328156 Jul  2 15:09 firehose.printed
-rw-r--r--  1 myuser  staff  5009543 Jul  2 15:09 firehose.printed.gz
-rw-r--r--  1 myuser  staff  1233761 Jun 28 16:23 spark.parquet
-rw-r--r--  1 myuser  staff  3213528 Jul  2 15:09 spark.printed
-rw-r--r--  1 myuser  staff  1951058 Jul  2 15:09 spark.printed.gz

请注意,两个拼花地板文件的大小大致相同,但firehose文件的“打印”内容大约是spark文件中“打印”内容大小的2.5倍。它们的可压缩性差不多。

那么:如果不是原始数据,Spark拼花地板文件中的所有空间都被什么占据了?

编辑以添加:

下面是“拼花工具元”的输出。每列的压缩比看起来相似,但firehose文件中每个未压缩字节包含更多的值。对于“dfpts”列:

消防水管:

SZ:667849/904992/1.36 VC:161475

火花:

SZ:735561/1135861/1.54 VC:62643

拼花工具元输出:

file:            file:/Users/jh01792/Downloads/firehose.parquet 
creator:         parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 

file schema:     hive_schema 
--------------------------------------------------------------------------------
udids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
dfpts:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uuids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uri:             OPTIONAL BINARY L:STRING R:0 D:1

row group 1:     RC:156 TS:1905578 OFFSET:4 
--------------------------------------------------------------------------------
udids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:4 SZ:421990/662241/1.57 VC:60185 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 58, min/max not defined]
dfpts:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:421994 SZ:667849/904992/1.36 VC:161475 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 53, min/max not defined]
uuids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:1089843 SZ:210072/308759/1.47 VC:39255 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 32, min/max not defined]
uri:              BINARY GZIP DO:0 FPO:1299915 SZ:5397/29586/5.48 VC:156 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

file:        file:/Users/jh01792/Downloads/spark.parquet 
creator:     parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"uri","type":"string","nullable":false,"metadata":{}},{"name":"dfpts","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"udids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"uuids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
uri:         REQUIRED BINARY L:STRING R:0 D:0
dfpts:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
udids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
uuids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3

row group 1: RC:11 TS:1943008 OFFSET:4 
--------------------------------------------------------------------------------
uri:          BINARY GZIP DO:0 FPO:4 SZ:847/2530/2.99 VC:11 ENC:PLAIN,BIT_PACKED ST:[num_nulls: 0, min/max not defined]
dfpts:       
.list:       
..element:    BINARY GZIP DO:0 FPO:851 SZ:735561/1135861/1.54 VC:62643 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
udids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:736412 SZ:335289/555989/1.66 VC:23323 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
uuids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:1071701 SZ:160494/248628/1.55 VC:13305 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

共有2个答案

东郭自珍
2023-03-14

我能想到的两件事可以归因于这种差异。
1.镶木地板属性。
在Spark中,您可以使用以下片段找到与镶木地板相关的所有属性。
如果使用Hadoop配置设置属性,

import scala.collection.JavaConverters._

// spark = SparkSsssion
spark.sparkContext.hadoopConfiguration.asScala.filter {
  x =>
    x.getKey.contains("parquet")
}.foreach(println)

如果属性是使用Spark设置的(Spark defaults.conf--conf等)

spark.sparkContext.getConf.getAll.filter {
  case(key, value) => key.contains("parquet")
}.foreach(println)

如果我们能够得到消防水带(我不熟悉)的配置,我们可以做一个比较。否则,configs也应该给出可能出错的大致情况
2。Spark和FireHose之间使用的拼花地板版本不同
拼花社区可以在不同版本之间更改拼花配置的默认值。

堵飞鸿
2023-03-14

您可能应该以不同的方式提出您的问题:

为什么Firehose数据的压缩比Spark数据更有效?

对于拼花地板,你有几种可能的解释:

>

除了压缩方案外,拼花地板还尝试对您的值使用最有效的编码。特别是对于字节html" target="_blank">数组,默认情况下,它会尝试使用字典编码,即将每个不同的字节数组值映射到一个int,然后简单地将int存储在列数据中(此处有更多信息)。如果字典变得太大,它将回退到只存储字节数组值。

如果Firehose数据集包含的值的多样性比Spark数据集少得多,那么其中一个可能使用了有效的字典编码,而另一个则没有。

排序数据

排序后的数据通常比未排序的数据压缩得好得多,因此,如果Firehose列的值是自然排序的(或者至少是更频繁地重复),拼花编码和gzip压缩将获得更好的压缩比

不同的行组大小

拼花将值拆分为大小可调的行组(Spark中的parquet.block.size配置)。压缩和编码应用于行组级别,因此行组越大,压缩效果越好,但编码可能更差(例如,您可以从字典编码切换到纯byte_array值)和读取或写入时更高的内存要求。

如何了解您的情况?

使用拼花工具检查列的详细编码数据:

例如,在我的一个数据集上:

$ parquet-tools meta part-00015-6a77dcbe-3edd-4199-bff0-efda0f512d61.c000.snappy.parquet

...

row group 1:              RC:63076 TS:41391030 OFFSET:4
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:6042924 SZ:189370/341005/1,80 VC:269833 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

...

row group 2:              RC:28499 TS:14806649 OFFSET:11648146
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:13565454 SZ:78631/169832/2,16 VC:144697 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

列数据上的ENC属性提供用于列的编码(本例中为字典)SZ属性提供压缩大小/未压缩大小/压缩比VC编码值的数量。

在我的示例中,您可以看到,由于数据分布的原因,行组2中的压缩比略好于行组1。

更新:

查看您提供的统计数据,您可以看到数据集中的dfpts列的平均编码值大小为904992/161475=5.6字节,而spark版本的平均编码值大小为1135861/62643=18.13字节,尽管两者都是相同的字典编码。这可能意味着RLE在firehose数据集上的效率要高得多,因为您有很多重复值或不太明显的值。如果在保存到拼花地板之前在spark中对dfpts列进行排序,则很可能会获得与firehose数据类似的编码比率。

 类似资料:
  • 我有一个超过10亿行的DataFrame(df) 从上面的命令中,我了解到我的100个工作节点集群(spark 2.4.5)中只有5个工作节点将执行所有任务。使用聚结剂(5)需要7小时才能完成。 我应该尝试< code >重新分区而不是< code >联合? 有没有一种更快速/高效的方法来写出128 MB大小的拼花文件,或者我需要首先计算数据帧的大小来确定需要多少分区。 例如,如果我的数据帧大小为

  • 我有一个avro格式的数据流(json编码),需要存储为镶木地板文件。我只能这样做, 把df写成拼花地板。 这里的模式是从json中推断出来的。但是我已经有了avsc文件,我不希望spark从json中推断出模式。 以上述方式,parquet文件将模式信息存储为StructType,而不是avro.record.type。是否也有存储avro模式信息的方法。 火花 - 1.4.1

  • 现在Spark 2.4已经内置了对Avro格式的支持,我正在考虑将数据湖中某些数据集的格式从Parquet更改为Avro,这些数据集通常是针对整行而不是特定列聚合进行查询/联接的。 然而,数据之上的大部分工作都是通过Spark完成的,据我所知,Spark的内存缓存和计算是在列格式的数据上完成的。在这方面,Parquet是否提供了性能提升,而Avro是否会招致某种数据“转换”损失?在这方面,我还需要

  • 我有一个数据帧,它是由运行特定日期的每日批处理创建的,然后保存在HDFS(Azure Data Lake Gen 2)中。 它是用这样的东西保存的 如您所见,我没有对数据帧进行分区,因为它只包含一个日期。 例如,第一天的第一个文件将存储在文件夹中 交易/2019/08/25 然后第二天,它就会在文件夹里 贸易/2019/08/26 问题是,当所有数据都放好后,日期上的过滤器谓词是否仍会被按下,HD

  • 我一直在阅读有关此主题的一些问题以及几个论坛,并且在所有这些论坛中,他们似乎都提到从Spark中产生的每个. parket文件应该是64MB或1GB大小,但仍然无法让我想到哪些案例场景属于每个这些文件大小以及除了HDFS将它们拆分为64MB块之外的原因。 我当前的测试场景如下。 我目前每天处理2.5GB到3GB的数据,这些数据每年将被拆分并保存到每日存储桶中。n等于4或48的原因只是为了测试,因为

  • 问题内容: 有没有办法从Java创建镶木地板文件? 我的内存中有数据(java类),我想将其写入一个Parquet文件中,以便以后从apache-drill中读取它。 有没有简单的方法可以做到这一点,例如将数据插入sql表? 得到它了 谢谢您的帮助。 结合答案和此链接,我能够创建一个实木复合地板文件并用钻头将其读回。 问题答案: 不建议使用ParquetWriter的构造函数(1.8.1),但不建