当前位置: 首页 > 知识库问答 >
问题:

如何在Spark中向现有分区添加行?

张光辉
2023-03-14

我必须更新历史数据。说到更新,我的意思是向S3上的现有分区添加新行,有时是新列。

当前分区按日期实现:创建\u年={}/创建\u月={}/创建\u日={}。为了避免每个分区有太多对象,我执行以下操作来维护单个对象/分区:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

存在这样一种情况,我必须添加具有以下列值的某些行:

created_year | created_month | created_day
2019         |10             |27   

这意味着此路径中的文件(S3对象):created\u year=2019/created\u month=10/created\u day=27/some\u random\u name。拼花地板将追加新行。

如果模式中有更改,那么所有对象都必须实现该更改。

我试着研究这通常是如何工作的,因此,有两种感兴趣的模式:覆盖、附加。

第一个将只添加当前数据并删除其余数据。我不希望出现这种情况。第二个将附加,但最终可能会创建更多对象。我也不希望出现这种情况。我还读到数据帧在Spark中是不可变的。

那么,如何实现将新数据附加到现有分区并每天维护一个对象呢?


共有1个答案

卢嘉誉
2023-03-14

根据您的问题,我理解您需要在不增加拼花文件数量的情况下向存量数据添加新行。这可以通过对特定分区文件夹进行操作来实现。执行此操作时可能有三种情况。

这意味着传入数据在分区列中有一个新值。在您的情况下,这可能类似于:

现有数据

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  1  |

新建数据

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  2  |

因此,在这种情况下,您可以为传入的数据创建一个新的分区文件夹,并按原样保存它。

partition_path = "/path/to/data/year=2020/month=1/day=2"
new_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

这是您要将新行附加到存量数据的位置。它可以是这样的:

现有数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |

新建数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  b  |   1   |

在这里,我们为同一个分区创建了一条新记录。您可以使用“追加模式”,但您希望每个分区文件夹中都有一个拼花文件。这就是为什么您应该先读取现有分区,将其与新数据合并,然后将其写回。

partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.unionByName(new_data)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

如果传入的数据是更新,而不是插入,该怎么办?在这种情况下,您应该更新一行,而不是插入一个新行。想象一下:

现有数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |

新建数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   2   |

“a”之前的值为1,现在我们希望它为2。因此,在这种情况下,您应该读取现有数据并更新现有记录。这可以通过如下方式实现。

partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.join(new_data, ["year", "month", "day", "key"], "outer")
write_data = write_data.select(
    "year", "month", "day", "key",
    F.coalesce(new_data["value"], old_data["value"]).alias("value")
)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

当我们将旧数据和新数据连接在一起时,可能有四件事,

  • 两个数据具有相同的值,取哪个并不重要
  • 两个数据有不同的值,取新值
  • 旧数据没有价值,新数据有,取新
  • 新数据没有值,旧数据有,取旧

为了实现我们在这里的愿望,我们从pyspark中联合起来。sql。函数将完成此工作。

请注意,此解决方案还包括第二种情况。

Spark支持拼花文件格式的模式合并。这意味着您可以在数据中添加列或删除列。当您添加或删除列时,您会意识到从顶层读取数据时某些列不存在。这是因为Spark默认禁用模式合并。从留档:

与Protocol Buffer、Avro和Thrift一样,Parque也支持模式演进。用户可以从一个简单的模式开始,然后根据需要逐渐向模式中添加更多列。这样,用户最终可能会得到多个具有不同但相互兼容模式的Parket文件。Parque数据源现在能够自动检测这种情况并合并所有这些文件的模式。

要能够读取所有列,您需要将mergeSchema选项设置为true

df = spark.read.option("mergeSchema", "true").parquet(path)
 类似资料:
  • 我正在表任务中添加新列名标题。但我得到一个错误,该表中不存在此列。谁能帮我解决那个错误。这是我的密码: 然后添加此代码 到创建的新表文件

  • 这是如何创建新的10个主题分区的例子- 现在我们想在主题名称中添加10个额外的分区- 如何在现有的10个分区中添加额外的分区?

  • 作为一个具体的示例,假设我有一个工作簿和工作表,其中有一个从A1开始的2列2行(包括标题)的现有表。我可以打开底层的XL>Tables>Table1.xml并看到以下内容: 我可以看到我的两列、根标记的ref属性以及autoFilter块的ref属性。我想要做的是添加一个新行,这样表的面积将是A1:B3。

  • 问题内容: 我想在Route和Router类型上添加一个便捷的util方法: 但是编译器告诉我 无法在非本地类型mux.Router上定义新方法 那么我将如何实现呢?是否创建具有匿名mux.Route和mux.Router字段的新结构类型?或者是其他东西? 问题答案: 正如编译器所提到的,您不能在另一个包中扩展现有类型。您可以定义自己的别名或子包,如下所示: 或嵌入原始路由器:

  • 问题内容: 我有一个名为“ Person”的表名,下面是列名 我忘了约束。 现在,我尝试使用以下查询将“ 约束” 添加到名为的现有列中, 我收到语法错误…。 问题答案: 只需使用查询并将其添加到您现有的列定义中即可。例如: 请注意:使用查询时,您需要再次指定 完整的 列定义。例如,如果您的列具有值或列注释,则需要在语句中与数据类型和一起指定它,否则它将丢失。防止此类情况发生的最安全方法是从查询的输

  • 问题内容: 我已经使用python创建了一个txt文件,其中包含几行文本,这些文本将由一个简单的程序读取。但是,我在重新打开文件以及在程序的后续部分中在文件中写入其他行时遇到了一些麻烦。(这些行将从稍后获得的用户输入中写入。) 这是假设“ file.txt”已被打开并被写入。但是,使用我当前拥有的代码第二次打开该文件时,我必须擦除之前编写的所有内容并重写新行。有没有办法防止这种情况发生(并可能减少