当前位置: 首页 > 知识库问答 >
问题:

pyspark-分区数据的计算(使用“附加”模式创建)缓慢

席嘉祯
2023-03-14

我在分区后的查询上有一个性能问题。

我每天有一个大约3000万行20列的拼花文件。例如,文件data\u 20210721。拼花地板看起来像:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

我们有一个代码来处理它,以便只有一天,并缩短午夜,这样我们就可以:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A         | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

第2行可以称为残差,因为它与文件不在同一天。

然后,我们希望每天生成1个拼花地板,因此默认的解决方案是处理每个文件并使用以下内容保存数据框:

df.write.partitionBy(["id", "daytime"]).mode("append").parquet("hdfs/path")

模式设置为追加,因为第二天,我们可能会有过去/未来几天的残差。

还有其他级别的分区,例如:

  • ID:固定一年左右(节省存储空间挺好;)
  • 周数
  • 国家/地区

即使分区在行方面相当“平衡”,处理时间也变得非常慢。

例如,要计算给定日期集每天的行数,请执行以下操作:

  • 原始df(7秒):
spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
  • 分区数据(几分钟)
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()

我们认为在最后一级有太多的小分区(由于追加,大约有600个很少Kb/Mb的非常小的文件),所以我们尝试为每个分区合并它们,但没有任何改进。我们还尝试只在白天进行分区(以防多个级别的分区会产生问题)。

是否有任何解决方案可以提高性能(或了解瓶颈在哪里)?它是否可以与我们正在划分日期列的事实相联系?例如,我看到了很多按年/月/日划分的示例,它们是3个整数,但不符合我们的需要。

这个解决方案非常适合解决我们遇到的许多问题,但是如果性能损失太重要而无法保持原样,欢迎任何建议:)

这些问题源于以下两个方面的计划不同:

spark.read.parquet("path/to/data/DayTime=2021-07-10")

spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")

下面是一个小示例的计划,其中白天被转换为“长”,因为我认为可能是由于数据类型导致了缓慢:

spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)

== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
      +- Relation[date_from#4297,date_to#4298, ....] parquet

== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp, ts: int, ....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet

== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#4297,date_to#4298,ts#4308, ....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf], PartitionCount: 1, PartitionFilters: [isnotnull(ts#4308), (ts#4308 = 20200103)], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....

vs公司

spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)

== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet

== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp,, ....] parquet

== Optimized Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#2086,date_to#2087, .....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf/ts=20200103], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....

提前感谢,

尼古拉斯

共有1个答案

尉迟清野
2023-03-14

您必须确保过滤器实际使用的是分区结构,在磁盘级别进行修剪,而不是将所有数据放入内存,然后应用过滤器。

试着检查你的身体计划

spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )
.explain()

它应该有一个类似于分区过滤器的阶段:[isnotnull(日间#123),(日间#76=您的状况)],

我的猜测是在您的情况下,它没有使用此分区过滤器并且扫描了整个数据。

我建议尝试使用小型数据集尝试您的语法/重新分区策略,直到您实现分区过滤器

 类似资料:
  • 如何在sql like partition上通过条件连接两个Pyspark数据帧?我实际上需要连接两个数据框,以便对于每个组(基于一个列变量),我与另一个表外部连接。 例如,我有以下两个数据帧: DF1: df2: 所需输出如下: 我尝试过使用Pyspark的< code>Window操作符,但是没有成功,因为它不能用于连接一个窗口。 任何帮助将不胜感激。

  • 问题内容: 对于一个宠物项目,数据库进入了顶峰, 元数据 达到了顶峰,我很难理解该命令与MySQL命令之间的区别(如果有)。 有什么区别吗?如果不是这样,这是关系数据库行为的一种相当典型的模式(我听说过,对于其他数据库(例如Oracle),模式存在于数据库中,而不是与数据库处于同一级别)。 谢谢! 问题答案: MySQL的文档说: CREATE DATABASE创建具有给定名称的数据库。要使用此语

  • 我正在尝试创建一个mysql数据库/模式,如果它还不存在的话。 以下是我尝试过的: docker编写。yml公司 创建数据库 它不起作用。如果架构不存在,使用docker/docker compose创建架构的最佳方式是什么?

  • 我正在编写一个脚本,该脚本的目的是:基于包含后缀日期时间的旧表设置表名(例如,table_1_2020_01_01),基于旧表的架构为该表创建架构,然后从旧表向该表插入数据。 我创建了三个MySQL语句,用于设置表名,创建表,然后将数据插入到创建的表中: 但是,每次我运行我的运行我的func从主,我收到这个错误: 我在golang中发现Set表名是一个var。所以我的解决方案是使用准备好的报表,并

  • 我正在测试在一个正在运行的系统中添加Kafka分区,但我不清楚如果您将分区添加到一个现有的主题中,Kafka如何管理现有的数据。 例如: 我有一个主题为的Kafka实例,有一个分区和一个副本。 生产者组开始插入该主题,消费者组开始消费。 我更改主题以添加另一个分区。 在本例中,主题数据发生了什么?是在两个分区之间重新平衡,还是只有新生成的数据才会使用新分区?

  • 行动时刻 - 使用unlang创建数据计数器 我们首先必须确保某些事情到位,以便这项工作取得成功。 应首先完成以下项目作为准备: 在字典中定义自定义属性。 创建将由FreeRADIUS perl模块使用的Perl脚本。 更新Mikrotk和Chillispot词典。 准备用户文件。 准备SQL数据库。 将unlang代码添加到虚拟服务器以充当数据计数器。 如果存在,则识别LD_PRELOAD错误。