当前位置: 首页 > 知识库问答 >
问题:

附加到镶木地板文件,由具有重叠时间戳的数据分区

慕宏博
2023-03-14

我的数据帧有时间戳列。我将其转换为日期,按日期进行分区,并每天将其附加到不断增长的拼花文件中。

如果我附加一个时间戳从2021 04月19日01:00:01到2021 04月19日13:00:00的数据集,它会将其写入分区中的拼花地板。

如果在当天晚些时候,我附加另一个时间戳从2021 04月19日15:00:00到2021 04月19日20:00:00的数据集,它会覆盖之前的分区吗?该分区的数据从凌晨1点到下午1点?或者它真的会附加到它上面吗?

我使用以下语法:

df.write.mode('append').partitionBy("DATE").parquet("s3://path")

共有2个答案

锺威
2023-03-14

按照mck的建议(在尝试之前你不会知道),我做到了,而且正如担心的那样,它基本上用新数据重写了整个分区。

我考虑了一下,决定总是重新流化前一天的内容,并在分区中覆盖。在我的情况下,它对我有效,因为我确实可以访问5天的缓冲区数据,我可以重新提取这些数据。但这种解决方案不适用于那些只有几个小时左右的瞬态数据的用户。

胡浩瀚
2023-03-14

从Spark文档中的保存模式:

追加:将数据帧保存到数据源时,如果数据/表已经存在,则数据帧的内容将追加到现有数据中。

因此,它符合您的期望。下面是一个检查行为的玩具示例:

data_batch_1 = [("2021-04-19", "2021-04-19 01:00:01", 1.1), 
                ("2021-04-19", "2021-04-19 13:00:00", 1.2)]

data_batch_2 = [("2021-04-19", "2021-04-19 15:00:00", 2.1), 
                ("2021-04-19", "2021-04-19 20:00:00", 2.2)]

col_names = ["DATE", "ts", "sensor1"]

df_batch_1 = spark.createDataFrame(data_batch_1, col_names)
df_batch_2 = spark.createDataFrame(data_batch_2, col_names)

s3_path = "/tmp/67163237/"

保存批次1

df_batch_1.write.mode("append").partitionBy("DATE").parquet(s3_path)
spark.read.parquet(s3_path).show()
+-------------------+-------+----------+
|                 ts|sensor1|      DATE|
+-------------------+-------+----------+
|2021-04-19 01:00:01|    1.1|2021-04-19|
|2021-04-19 13:00:00|    1.2|2021-04-19|
+-------------------+-------+----------+

保存批次2

df_batch_2.write.mode("append").partitionBy("DATE").parquet(s3_path)
spark.read.parquet(s3_path).show()
+-------------------+-------+----------+
|                 ts|sensor1|      DATE|
+-------------------+-------+----------+
|2021-04-19 15:00:00|    2.1|2021-04-19|
|2021-04-19 01:00:01|    1.1|2021-04-19|
|2021-04-19 20:00:00|    2.2|2021-04-19|
|2021-04-19 13:00:00|    1.2|2021-04-19|
+-------------------+-------+----------+
 类似资料:
  • 有一个AWS胶水爬虫,它正在创建一个包含拼花文件的S3目录中所有表的数据目录。 我需要将这些文件/表的内容复制到Redshift表。我有几个表,Redshift无法支持Parket文件数据大小。是不够的。 在理想情况下,希望截断这些表。 如何使用COPY命令将此数据加载到红移中?如果我使用spectrum,我只能使用户从外部表插入到红移表,我知道这比批量复制慢?

  • 我有一个数据帧,它是由运行特定日期的每日批处理创建的,然后保存在HDFS(Azure Data Lake Gen 2)中。 它是用这样的东西保存的 如您所见,我没有对数据帧进行分区,因为它只包含一个日期。 例如,第一天的第一个文件将存储在文件夹中 交易/2019/08/25 然后第二天,它就会在文件夹里 贸易/2019/08/26 问题是,当所有数据都放好后,日期上的过滤器谓词是否仍会被按下,HD

  • 问题内容: 有没有办法从Java创建镶木地板文件? 我的内存中有数据(java类),我想将其写入一个Parquet文件中,以便以后从apache-drill中读取它。 有没有简单的方法可以做到这一点,例如将数据插入sql表? 得到它了 谢谢您的帮助。 结合答案和此链接,我能够创建一个实木复合地板文件并用钻头将其读回。 问题答案: 不建议使用ParquetWriter的构造函数(1.8.1),但不建

  • 我有一个avro格式的数据流(json编码),需要存储为镶木地板文件。我只能这样做, 把df写成拼花地板。 这里的模式是从json中推断出来的。但是我已经有了avsc文件,我不希望spark从json中推断出模式。 以上述方式,parquet文件将模式信息存储为StructType,而不是avro.record.type。是否也有存储avro模式信息的方法。 火花 - 1.4.1

  • 问题内容: 我正在使用Maven组装插件组装胖子罐,并遇到以下问题: 这是pom.xml: 如果我在Intellij IDEA中运行它,则不会发生此问题。 我还应该在罐子中添加什么才能找到课程? 问题答案: 我找到了解决问题的办法。我尝试使用构建程序包,并遇到了其他但相关的问题。我在这里找到的解决方案:https : //stackoverflow.com/a/27532248/5520896也有

  • 我有一个超过10亿行的DataFrame(df) 从上面的命令中,我了解到我的100个工作节点集群(spark 2.4.5)中只有5个工作节点将执行所有任务。使用聚结剂(5)需要7小时才能完成。 我应该尝试< code >重新分区而不是< code >联合? 有没有一种更快速/高效的方法来写出128 MB大小的拼花文件,或者我需要首先计算数据帧的大小来确定需要多少分区。 例如,如果我的数据帧大小为