当前位置: 首页 > 知识库问答 >
问题:

spark在S3上的分区内创建分区

惠志
2023-03-14

我有以下制表符分隔的示例数据集:

col1  period  col3  col4  col5  col6  col7  col8  col9  col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22
ASSDF 202001  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202002  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202003  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202004  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
...
...
ASSDF 202312  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99

我正在对此数据运行一些转换,最终数据位于spark dataset“ds1”中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。

我的scala函数来保存TSV数据集。

def saveTsvDataset(dataframe: DataFrame, outputFullPath: String, numPartitions: Integer, partitionCols: String*): Unit = {
    dataframe
      .repartition(numPartitions)
      .write
      .partitionBy(partitionCols:_*)
      .mode(SaveMode.Overwrite)
      .option("sep", "\t")
      .csv(outputFullPath)
  }

在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod。

 saveTsvDataset(
      DS1.withColumn("datasetPeriod",$"period")
      , "s3://s3_path"
      , 100
      , "period"
    )

现在,我的问题是我的周期从202001到202312,当我在s3上使用“DataSetPeriod”上的分区编写时,有时它会在任何随机的周期内创建分区。所以这在任何时期都是随机发生的。我从来没有看到这种情况发生了多个时期。它创建类似“s3://s3_path/datasetperiod=202008/datasetperiod=202008”的路径。

共有1个答案

严柏
2023-03-14

DataFrame中已经有句点列。因此不需要再创建一个新的重复的DataSetPeriode列。

当您使用.PartitionBy(“period”)将DataFrame写入S3:/../ParentFolder时,它将创建如下所示的文件夹:

df.write.partitionBy("period").csv("s3://../parentFolder/")
s3://.../parentFolder/period=202001/
s3://.../parentFolder/period=202002/
s3://.../parentFolder/period=202003/
...
s3://.../parentFolder/period=202312/

在回读数据时,只需提到路径tillparentfolderonly,它将自动读取periode作为列之一。

val df = spark.read.csv("s3://../parentFolder/")
//df.schema will give you `period` as one of the column
df.printSchema
root
 |-- col1: string (nullable = true)
 |-- .... //other columns go here
 |-- period: string (nullable = true)

也就是说,无论您在partition列中获得多个分区,都只是由于使用PartitionBy写入数据时使用了错误的路径。

 类似资料:
  • 这是将Spark dataframe保存为Hive中的动态分区表的后续操作。我试图在答案中使用建议,但无法在Spark 1.6.1中使用 任何推动这一进程的帮助都是感激的。 编辑:还创建了SPARK-14927

  • 我创建一个数据文件,导入一个大约8MB的csv文件,如下所示: 最后,我打印dataframe的分区数: 答案是2。

  • 假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?

  • 即使它是Hive表或HDFS文件,当Spark读取数据并创建数据帧时,我也在想RDD /数据帧中的分区数将等于HDFS中的部分文件数。但是,当我使用 Hive 外部表进行测试时,我可以看到该数字与 部件文件的数量 不同。数据帧中的分区数为 119。该表是一个 Hive 分区表,其中包含 150 个部分文件,最小文件大小为 30 MB,最大大小为 118 MB。那么是什么决定了分区的数量呢?

  • 问题内容: 我正在尝试使用HiveCLI上的动态分区从另一个表创建一个新表。我正在从Hive官方Wiki学习,这里有以下示例: 但是我收到了这个错误: 失败:SemanticException [错误10065]: CREATE TABLE AS SELECT命令无法指定目标表的列列表 资料来源:https : //cwiki.apache.org/confluence/display/Hive/

  • 当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?