当前位置: 首页 > 知识库问答 >
问题:

Databricks Delta实时表-应用来自Delta表的更改

师建德
2023-03-14

我正在使用Databricks Delta Live表,但是在向上插入一些表时遇到了一些问题。我知道下面的文字很长,但我还是尽可能清晰地描述了我的问题。如果有些部分不清楚,请告诉我。

我有下面的表格和流程:

着陆区(_Z)-

要从着陆区转到原始表,我有以下Pysark代码:

cloudfile = {"cloudFiles.format":"JSON", 
                       "cloudFiles.schemaLocation": sourceschemalocation, 
                       "cloudFiles.inferColumnTypes": True}

@dlt.view('landing_view')
def inc_view():
    df = (spark
             .readStream
             .format('cloudFiles')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table('raw_table', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
  
dlt.apply_changes(target='raw_table',
                  source='landing_view',
                  keys=['id'],
                  sequence_by='updated_at')

此代码按预期方式工作。我运行它,添加更改。JSON文件到登陆区域,重新运行管道,更新插入正确应用于“raw_table”

(但是,每次在delta文件夹中创建一个包含所有数据的新的parquet文件时,我希望只添加一个包含插入和更新行的parquet文件。关于当前版本的一些信息保存在增量日志中?不确定这是否与我的问题相关。我已经将' raw_table '的table_properties更改为enableChangeDataFeed = true。“intermediate_table”的readStream则具有option(readChangeFeed,“true”)。

然后,我有下面的代码从我的“raw_table”转到我的“intermediate_table”:

@dlt.table(name='V_raw_table', table_properties={delta.enableChangeDataFeed': 'True'})
def raw_table():
     df = (spark.readStream
                .format('delta')
                .option('readChangeFeed', 'true')
                .table('LIVE.raw_table'))
     df = df.withColumn('ExtraCol', <Transformation>)
     return df
 ezeg
dlt.create_target_table('intermediate_table')
dlt.apply_changes(target='intermediate_table',
                  source='V_raw_table',
                  keys=['id'],
                  sequence_by='updated_at')

不幸的是,当我运行这个命令时,我得到了一个错误:‘在版本2的源表中检测到一个数据更新(例如part-00000-7127 BD 29-6820-406 c-a5a 1-e76fc 7126150-c000 . snappy . parquet)。目前不支持此功能。如果您想忽略更新,请将选项“ignoreChanges”设置为“true”。如果您希望反映数据更新,请使用新的检查点目录重新启动此查询。

我签入了“忽略更改”,但不要认为这是我想要的。我希望自动加载器能够检测到增量表中的更改,并将它们传递到流中。

我知道readStream只与append一起工作,但这就是为什么我希望在“raw_table”被更新后,一个新的parquet文件将被添加到delta文件夹中,只有插入和更新。然后,这个添加的拼花文件被自动加载器检测到,并可用于将更改应用到“intermediate_table”中。

我这样做不对吗?还是我忽略了什么?提前感谢!

共有2个答案

王云
2023-03-14

当您更改设置以包含选项“. ption”(“readChangeFeed”,“true”)“时,您应该从完全刷新开始(开始附近有下拉列表)。这样做将解决错误“检测到的数据更新xxx”,您的代码应该适用于增量更新。

呼延震博
2023-03-14

由于 readStream 仅适用于追加,因此源文件中的任何更改都会在下游产生问题。假设“raw_table”上的更新只会插入新的 parquet 文件是不正确的。根据“优化写入”等设置,甚至不带“优化写入”,apply_changes可以添加或删除文件。您可以在“raw_table/_delta_log/xxx.json”中的“num目标文件已添加”和“已删除目标文件”下找到此信息。

基本上,“Databricks建议您使用自动加载程序仅引入不可变文件”。

 类似资料:
  • 问题内容: 是否可以在带有子选择的mysql 5.0上运行UPDATE命令。 我要运行的命令是这样的: ISBN13当前存储为字符串。 这应该更新10k +行。 谢谢, 威廉 问题答案: 只需更改一下即可:

  • 问题内容: 我有2张table: 现在我需要一个函数或光标或任何会得到两个输入的东西,即要复制的文件夹和要复制的目标文件夹,该函数应将文件夹及其子文件夹复制到具有新ID和父ID的同一表中,如下所示当复制并插入文件夹时,文件表中的文件也将被插入,请帮助我获得以下结果。 如果我将folder5对应到folder3,我的输出应该是这样的: 当复制和插入文件夹时,文件表也将被更新,如下所示: 问题答案:

  • 问题内容: 我正在尝试使用同一表中不同行(和不同列)中的值更新表中的行。尽管我的语法没有任何结果,但与此类似:这是代码(已更新): 问题答案: 或者,简单地

  • 如何在Databricks中删除Delta表?我在文档中找不到任何信息……也许唯一的解决方案是使用magic命令或dbutils删除文件夹“delta”中的文件: 编辑: 为了澄清,我在这里举了一个非常基本的例子。 例子: 并将其保存在增量表中 然后,如果我尝试删除它……这是不可能的删除表或类似的行动 其他选项,如删除表“增量/测试表”等...

  • 我试图理解数据在DataBricks环境中是如何存储和管理的。我对引擎盖下发生的事情有一个相当不错的理解,但在网上看到了一些相互矛盾的信息,因此希望得到一个详细的解释来巩固我的理解。为了提出我的问题,我想总结一下我在Apache Spark开发人员课程中所做的部分练习。 作为练习的一部分,我在Databricks平台上遵循了以下步骤: 启动我的群集 将拼花文件读取为DataFrame 将DataF

  • 问题内容: 我想用另一个表中的数据更新mySql中的表。 我有两个表“ people”和“ business”。人员表通过称为“ business_id”的列链接到业务表。 必要的表结构,主键带有星号(表:列):人员: business_id, sort_order,email商业:* business_id,email 我想用人员表中的电子邮件更新业务表电子邮件列,如下所示(我知道我在这里遗漏了