当前位置: 首页 > 知识库问答 >
问题:

为什么CSV会根据文件大小读取Apache Spark分区以及如何更改分区?

吴德辉
2023-03-14

下面是我的pyspark代码:

csv_file = "/FileStore/tables/mnt/training/departuredelays02.csv"
schema   = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"
df = (spark
     .read
     .format("csv")                    
     .option("header","true")
     .schema(schema)
     .load(csv_file)                  
        )
partitions = df.rdd.getNumPartitions()
print(partitions)

现在,如果我尝试加载以下文件,该文件大得多,有1391578行:

csv_file = "/FileStore/tables/mnt/training/departuredelays.csv"

我得到一个8的分区。

我的问题是如何强制第一个CSV以与较大文件相同的方式进行分区。我知道可以使用重新分区,但我很想知道这是否可以在没有任何洗牌的情况下完成?而且即使我们重新划分它,它似乎创建了一个有3个任务而不是8个任务的作业。

df = df.repartition(8)
print(df.count())
(3) Spark Jobs
Job 93 View(Stages: 1/1)
Stage 123: 3/3
Job 94 View(Stages: 1/1, 1 skipped)
Stage 124: 0/3 skipped
Stage 125: 8/8
Job 95 View(Stages: 1/1, 2 skipped)
Stage 126: 0/3 skipped
Stage 127: 0/8 skipped
Stage 128: 1/1

共有1个答案

墨翔宇
2023-03-14

您可以尝试使用coalesce,这样可以进行合理的洗牌,而不是重新分区。

df = spark
     .read
     .format("csv")                    
     .option("header","true")
     .schema(schema)
     .load(csv_file)                  
     .coalesce(8)

有关Spark-repartition()vs coalesce()的更多信息,请参见此

 类似资料:
  • 问题内容: 我有一个很大的csv文件,因此无法将它们全部读入内存。我只想阅读和处理其中的几行内容。所以我正在Pandas中寻找一个可以处理此任务的函数,基本的python可以很好地处理此任务: 但是,如果我在熊猫中这样做,我总是会读第一行: 我正在寻找一些更简单的方法来处理熊猫中的这项任务。例如,如果我想读取1000到2000的行。如何快速执行此操作? 我想使用熊猫,因为我想将数据读入数据框。 问

  • 我正在尝试动态地为需要读入R的文件创建名称。我得到了意想不到的结果,因为我构造文件名的两种方法产生了不同的结果,尽管包含文件名的字符向量似乎是相同的,但R对它们的处理是不同的。 你能帮我理解这是为什么吗?我应该如何在R中创建(动态)文件名? 创建于 2022-06-20 由 reprex 软件包 (v2.0.1) 我用的是Ubuntu 22.04操作系统上的R 4.2.0。

  • 问题内容: 我有一个查询sqlite3数据库,该数据库提供了已排序的数据。数据是根据“名称”列进行排序的。现在当我执行查询时 它提供了这样的数据。 表示正在考虑区分大小写的内容。我想要的方式如下 那么,对于必要的结果,我应该在sqlite3数据库中进行哪些更改? 相关如何在字符串比较时将Sqlite3设置为不区分大小写? 问题答案: SQLite数据类型文档讨论了用户定义的排序规则序列。具体来说,

  • 问题内容: 我的Mysql数据库中有一个表,该表用于身份验证。现在,我需要使身份验证区分大小写。在谷歌搜索中,我已经意识到Mysql列对于搜索操作是不区分大小写的(与Oracle相反),并且在创建表时可以通过指定“ binary”即更改默认行为。 有人可以告诉我如何更改Mysql中的表以将“二进制”添加到数据库的现有列吗? 谢谢! 问题答案:

  • 问题内容: 基本上,我要进行下一步。 请参阅此相关问题。我想每100行发送一次处理行,以实现批量分片。 有关实现相关答案的问题是csv对象无法下标并且不能使用len。 我该如何解决? 问题答案: 只需将您的下标包装到即可。显然,这会在大型文件上中断(请参见下面的 更新 中的替代方法): 进一步阅读:如何在Python中将列表分成均匀大小的块? 更新1 (列表版本):另一种可能的方法是处理每个卡盘,

  • 我正在使用的一个数据湖()有2 TB的数据和20000个文件。我想把数据集压缩成2000 GB的文件。 如果运行并写入磁盘,数据湖包含1.9 TB的数据。 如果运行并写入磁盘,数据湖包含2.6 TB的数据。 数据湖中的每个文件比预期的大0.3 GB(它们都是1.3 GB的文件,而不是1 GB的文件)。 为什么方法会增加整个数据湖的大小? 还有一个相关的问题讨论了为什么在运行聚合后数据湖的大小会增加