当前位置: 首页 > 知识库问答 >
问题:

使用配置单元的pyspark,append将添加到现有分区并复制数据

万喜
2023-03-14

我目前正在使用adwords api,我必须处理1天、7天和30天的数据。因此,spark作业是基本的,加载csv并将其写入带有分区的拼花地板:

df.write
  .mode("append")
  .format("parquet")
  .partitionBy("customer_id", "date")
  .option("path", warehouse_location+"/"+table)
  .saveAsTable(table)

现在我面临的问题是,7和30天将在某个时候(通过1天前)处理已经处理过的数据,因此在我的分区table/customer_id/date/file.parquet上,追加将追加第二个拼花文件到这个分区。

但在这种特定情况下,我希望新的拼花文件覆盖上一个文件(因为adwords csv将在生成的第一天到7/30天后进行更改)。

我环顾四周,如果我尝试使用“覆盖”,它会覆盖整个表,而不仅仅是分区。

你对如何在这里进行有什么建议吗?

我不是Spark专家,现在我唯一想做的就是有一个脚本,可以根据文件时间戳清理这个地方。但这似乎不是正确的解决方案。

PS:我用的是Spark 2.4

共有1个答案

翟迪
2023-03-14

根据SPARK-20236,您应该设置SPARK。sql。来源。partitionOverwriteMode=“动态”属性,然后使用“覆盖”模式替换现有表中的各个分区。

 类似资料:
  • 无法通过jupyter笔记本使用pyspark将数据写入hive。 给我下面的错误 Py4JJavaError:调用o99.saveAsTable时发生错误。:org.apache.spark.sql.分析异常:java.lang.运行时异常:java.lang.运行时异常:无法实例化org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreCl

  • 我正在使用Spark2.0,我想知道,是否可以列出特定配置单元表的所有文件?如果是这样,我可以直接使用spark增量地更新这些文件。如何向配置单元表添加新分区?有没有关于蜂巢转移瘤的api我可以从Spark使用? 有什么方法可以获得映射dataframe的内部配置单元函数吗 我的主要理由是对表进行增量更新。现在,我知道的唯一方法是SQL+,这不是很有效,因为他将覆盖所有表,而我主要感兴趣的是对某些

  • 我试图为我的表创建分区,以便更新一个值。 这是我的样本数据 我想把珍妮特的部门更新到B。 为此,我创建了一个以Department为分区的表。 创建外部表trail(EmployeeID Int、FirstName String、Designation String、Salary Int),按(Department String)行格式分隔字段进行分区,以“,”location'/user/sre

  • 随着分区的增长,对于一个表来说,这个语句所花费的时间要长得多(有些时候超过5分钟)。我知道它会扫描和解析s3中的所有分区(我的数据就在那里),然后将最新的分区添加到hive Messagore中。 我想用ALTER TABLE ADD PARTITION语句替换MSCK REPAIR。MSCK修复在添加最新分区时工作得非常好,但是我在使用ALTER TABLE ADD partition时遇到了分

  • 查询示例: 典型错误消息: 处理语句时出错:失败:执行错误,从org.apache.hadoop.hive.ql.exec.mr.MapredTask返回代码2 问题2:当我运行命令?我是否只运行相同的命令,但使用STRING而不是bigint?**完整错误消息:**