我有以下制表符分隔的示例数据集:
col1 period col3 col4 col5 col6 col7 col8 col9 col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22
ASSDF 202001 A B BFGF SSDAA WDSF SDSDSD SDSDSSS SDSDSD E F FS E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202002 A B BFGF SSDAA WDSF SDSDSD SDSDSSS SDSDSD E F FS E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202003 A B BFGF SSDAA WDSF SDSDSD SDSDSSS SDSDSD E F FS E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202004 A B BFGF SSDAA WDSF SDSDSD SDSDSSS SDSDSD E F FS E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
...
...
ASSDF 202312 A B BFGF SSDAA WDSF SDSDSD SDSDSSS SDSDSD E F FS E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
我正在对此数据运行一些转换,最终数据位于spark dataset“ds1”
中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。
我的scala函数来保存TSV数据集。
def saveTsvDataset(dataframe: DataFrame, outputFullPath: String, numPartitions: Integer, partitionCols: String*): Unit = {
dataframe
.repartition(numPartitions)
.write
.partitionBy(partitionCols:_*)
.mode(SaveMode.Overwrite)
.option("sep", "\t")
.csv(outputFullPath)
}
在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod。
saveTsvDataset(
DS1.withColumn("datasetPeriod",$"period")
, "s3://s3_path"
, 100
, "period"
)
现在,我的问题是我的周期从202001到202312,当我在s3上使用“DataSetPeriod”上的分区编写时,有时它会在任何随机的周期内创建分区。所以这在任何时期都是随机发生的。我从来没有看到这种情况发生了多个时期。它创建类似“s3://s3_path/datasetperiod=202008/datasetperiod=202008”
的路径。
DataFrame中已经有句点
列。因此不需要再创建一个新的重复的DataSetPeriode
列。
当您使用.PartitionBy(“period”)
将DataFrame写入S3:/../ParentFolder
时,它将创建如下所示的文件夹:
df.write.partitionBy("period").csv("s3://../parentFolder/")
s3://.../parentFolder/period=202001/
s3://.../parentFolder/period=202002/
s3://.../parentFolder/period=202003/
...
s3://.../parentFolder/period=202312/
在回读数据时,只需提到路径tillparentfolder
only,它将自动读取periode
作为列之一。
val df = spark.read.csv("s3://../parentFolder/")
//df.schema will give you `period` as one of the column
df.printSchema
root
|-- col1: string (nullable = true)
|-- .... //other columns go here
|-- period: string (nullable = true)
也就是说,无论您在partition列中获得多个分区,都只是由于使用PartitionBy写入数据时使用了错误的路径。
这是将Spark dataframe保存为Hive中的动态分区表的后续操作。我试图在答案中使用建议,但无法在Spark 1.6.1中使用 任何推动这一进程的帮助都是感激的。 编辑:还创建了SPARK-14927
我创建一个数据文件,导入一个大约8MB的csv文件,如下所示: 最后,我打印dataframe的分区数: 答案是2。
假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?
即使它是Hive表或HDFS文件,当Spark读取数据并创建数据帧时,我也在想RDD /数据帧中的分区数将等于HDFS中的部分文件数。但是,当我使用 Hive 外部表进行测试时,我可以看到该数字与 部件文件的数量 不同。数据帧中的分区数为 119。该表是一个 Hive 分区表,其中包含 150 个部分文件,最小文件大小为 30 MB,最大大小为 118 MB。那么是什么决定了分区的数量呢?
问题内容: 我正在尝试使用HiveCLI上的动态分区从另一个表创建一个新表。我正在从Hive官方Wiki学习,这里有以下示例: 但是我收到了这个错误: 失败:SemanticException [错误10065]: CREATE TABLE AS SELECT命令无法指定目标表的列列表 资料来源:https : //cwiki.apache.org/confluence/display/Hive/
当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?