当前位置: 首页 > 知识库问答 >
问题:

在数据砖自动加载器中处理重复项

山煜祺
2023-03-14

我是这个Databricks自动加载器的新手,我们有一个要求,我们需要通过Databricks自动加载器处理从AWS s3到delta表的数据。我正在测试这个自动加载程序,所以我遇到了重复的问题,即如果我上传一个文件名为emp_09282021.csv的文件,该文件与emp_09272021.csv文件具有相同的数据,那么它没有检测到任何重复,它只是简单地插入它们,所以如果我在emp_09272021.csv文件中有5行,现在当我上传emp_09282021.csv文件时,它将变成10行。

下面是我尝试的代码:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header",True) \
  .schema("id string,name string, age string,city string") \
  .load("s3://some-s3-path/source/") \
  .writeStream.format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "s3://some-s3-path/tgt_checkpoint_0928/") \
  .start("s3://some-s3-path/spark_stream_processing/target/")

请指导我如何处理这件事?

共有1个答案

张茂勋
2023-03-14

自动加载器的任务不是检测副本,它为您提供摄取数据的可能性,但您需要自己处理副本。对此有几种方法:

    < li >使用内置的< code>dropDuplicates函数。建议将它与水印一起使用,以避免创建一个巨大的状态,但您需要有一些列将被用作事件时间,并且它应该是< code>dropDuplicate列表的一部分(有关更多详细信息,请参见文档):
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("col1", "eventTime")
  • 使用Delta的合并功能-您只需要插入Delta表中没有的数据,但需要使用foreachBatch。类似这样的内容(请注意,表应该已经存在,或者您需要添加对不存在表的处理):
from delta.tables import *

def drop_duplicates(df, epoch):
  table = DeltaTable.forPath(spark, 
      "s3://some-s3-path/spark_stream_processing/target/")
   dname = "destination"
   uname = "updates"
   dup_columns = ["col1", "col2"]
   merge_condition = " AND ".join([f"{dname}.{col} = {uname}.{col}"
      for col in dup_columns])
   table.alias(dname).merge(df.alias(uname), merge_condition)\
     .whenNotMatchedInsertAll().execute()

# ....
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header",True) \
  .schema("id string,name string, age string,city string") \
  .load("s3://some-s3-path/source/") \
  .writeStream.foreachBatch(drop_duplicates)\
  .option("checkpointLocation", "s3://some-s3-path/tgt_checkpoint_0928/") \
  .start()

在这段代码中,您需要更改< code>dup_columns变量来指定用于检测重复项的列。

 类似资料:
  • 我已经在我的容器中压缩了文件,我每天都会得到一个或多个文件,当它们进来时,我想处理这些文件。我有一些问题。 > 我可以使用Databricks自动加载功能来处理zip文件吗?Autoloader是否支持zip文件? 使用Autoloader需要启用哪些设置?我有我的容器和sas令牌。 一旦zip文件被处理(解压缩,读取zip文件中的每个文件),我就不应该再次读取zip。当我使用自动加载器时,我该怎

  • 我正在使用 Azure Blob 存储来存储数据,并使用装载将此数据馈送到自动加载程序。我正在寻找一种方法来允许自动加载器从任何装载加载新文件。假设我的装载中有这些文件夹: mnt/ ├─ blob_container_1 ├─ blob_container_2 当我使用 .load('/mnt/') 时,没有检测到新文件。但是当我单独考虑文件夹时,它像.load('/mnt/blob_conta

  • 我有两个列表:一个包含产品,另一个包含相关价格。列表可以包含未定义数量的产品。列表的示例如下所示: 产品:[“苹果”、“苹果”、“苹果”、“橙子”、“香蕉”、“香蕉”、“桃子”、“菠萝”、“菠萝”] 价格:['1.00','2.00','1.50','3.00','0.50','1.50','2.00','1.00','1.00'] 我希望能够从产品列表中删除所有重复的产品,并只保留与价格列表中唯

  • 译者:yportne13 作者:Sasank Chilamkurthy 在解决机器学习问题的时候,人们花了大量精力准备数据。pytorch提供了许多工具来让载入数据更简单并尽量让你的代码的可读性更高。在这篇教程中,我们将学习如何加载和预处理/增强一个有价值的数据集。 在运行这个教程前请先确保你已安装以下的包: scikit-image: 图形接口以及变换 pandas: 便于处理csv文件 fro

  • 问题内容: 有没有办法让IPython自动重新加载所有更改的代码?在外壳中执行每行之前,或者在明确要求时失败。我正在使用IPython和SciPy进行很多探索性编程,每当更改模块时都必须手动重新加载每个模块,这是很痛苦的。 问题答案: 对于IPython版本3.1、4.x和5.x 然后,您的模块将默认 自动重新加载 。这是文档: 有一个窍门:当您使用时 忘记 以上 所有 内容时,请尝试:

  • 这篇文章是对 PHP自动加载功能 的一个总结,内容涉及 PHP自动加载功能 、PHP命名空间、PSR0/PSR4标准 等内容。 一、PHP 自动加载功能 PHP 自动加载功能的由来 在 PHP 开发过程中,如果希望从外部引入一个 Class ,通常会使用 include 和 require 方法,去把定义这个 Class 的文件包含进来。这个在小规模开发的时候,没什么大问题。但在大型的开发项目中,