当前位置: 首页 > 知识库问答 >
问题:

数据砖 - 仅通过更改从增量表写入到 orc 文件的读取流

阴永逸
2023-03-14

管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。

    < li >我有一个每1小时运行一次的Azure Databricks笔记本作业。该作业从ADLS以结构化流的形式读取orc文件(由上述管道创建的orc文件),然后使用合并功能根据主键列将数据向上插入增量表。
event_readstream = ( 
    spark.readStream
    .format("orc")
    .schema('my-schema.json')
    .load('/mnt/path/from/adls/to/orc/files/')
  )
  ...

def upsertEventToDeltaTable(df, batch_id): 
  input_df = (
    df
    .drop('address_line3')
    .dropDuplicates(['primaryKey'])
    .withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
  )
  
  input_df.createOrReplaceTempView("event_df_updates")
  
  input_df._jdf.sparkSession().sql("""
    MERGE INTO events dbEvent
    USING event_df_updates dfEvent
    ON dbEvent.primaryKey = dfEvent.primaryKey
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
                             """)

event_writestream = ( 
    event_readStream
    .drop('adddress_line3')  #some column to be dropped
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
    .foreachBatch(upsertEventToDeltaTable)
    .start()
  )

def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

location_writestream = ( 
  event_readstream   # Same read stream is used here 
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)
  .start()
)

问题:

  • 在我的情况下,如果我创建新的readstream(从delta表读取)和writestream(写入ORC文件),Delta表每1小时更新一次数据。ORC文件是否只包含在delta表中合并的更改?[详情如下]
  • 如果只将更改或更新的数据写入ORC文件,这种方法是否有任何问题?

在上述第2点中,不使用读取流(从orc文件中读取),而是使用如下的增量表路径创建一个新的读取流

deltatbl_event_readstream = spark.readStream.format("delta")
  .load("/mnt/delta/myadlsaccnt/user_events")  # my delta table location
  • 并使用不同的写入流,如下所示
def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

deltatbl_event_readstream
   .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)  # re-using the same method.
  .start()

共有2个答案

姚向晨
2023-03-14

我遇到了链接DATA AI峰会,其中有针对这种场景的演示。

在我的案例中,每一批都有

下面是类似于Alex Ott的回答,添加了其他信息

根据建议,如果批量更新更多,CDF可能无效。

  • 要启用CDF功能:
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • 对表执行任何更新/插入操作
  • 使用table_changes()函数查看更改
%sql 
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
    < li >作为流读取
event_read_stream = spark.readStream
     .format("delta")
     .option("readChangeFeed", "true")
     .option("startingVersion", "latest")
     .table("event") #// table name 
     .filter("_change_type != 'update_preimage'")
  • 创建upsert函数以合并更改
  • 写入流以写入信息
event_read_stream.writeStream.format("delta")
  .trigger(processingTime = "2 seconds") # if in case if job use once
  .outputMode("append")
  .start()
阙繁
2023-03-14

如果您只在增量表上使用纯 readStream,而没有任何选项,则不会获得有关更新的信息。实际上,流将在更新后失败,直到您设置选项忽略更改。这是因为Delta不跟踪更改,当您进行更新/删除时,它会重写现有文件,因此通过查看文件,您只能看到数据,而不知道它是插入还是更新。

但是,如果您需要从增量流式传输更改,则可以使用增量 8.4 中引入的增量更改数据馈送 (CDF) 功能(如果我没记错的话)。若要使其正常工作,需要通过将属性 delta.enableChangeDataFeed 设置为 true,在源增量表上启用它。从该版本开始,您将能够阅读更改的源,如下所示:

deltatbl_event_readstream = spark.readStream.format("delta")\
  .option("readChangeFeed", "true") \
  .option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
  .load("...")

这将添加三个额外的列来描述所执行的操作、Delta的版本和时间戳。如果只需要跟踪更改,只需要选择< code>_change_type列的值为< code > update _ posimage 的行,然后就可以将数据存储在任何需要的位置。

但请注意,在表上启用CDF后,其他客户端(DBR

 类似资料:
  • 我正在探索DataBricks Delta表及其时间旅行/时间特性。我有一些过去发生的事件数据。我正在尝试将它们插入delta表,并能够使用数据中的时间戳而不是实际的插入时间进行时间旅行。 我的事件中有一个日期/时间列。我将其重命名为“时间戳”,但它仍然不起作用。 我的 csv 数据如下所示:(数据显示 id=1000 的单个案例发生了 5 次更新) 我使用这些命令来创建增量表: 我有两个问题:

  • 问题内容: 这是一个有点奇怪的请求,但我正在寻找一种方法来将列表写入文件,然后再读回去。 我没有办法重新制作列表,以使它们如下面的示例所示正确地形成/格式化。 我的列表具有如下数据: 问题答案: 如果您不需要它是人类可读/可编辑的,则最简单的解决方案是使用。 来写: 读书: 如果您 确实 需要使它们易于阅读,则我们需要更多信息。 如果保证是没有嵌入换行符的字符串列表,则只需每行写一个: 如果它们是

  • 本文向大家介绍C#通过流写入数据到文件的方法,包括了C#通过流写入数据到文件的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#通过流写入数据到文件的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的C#程序设计有所帮助。

  • 问题内容: 我正在尝试读写文件,但是我想通过Resource访问该文件。 这就是我要做的 但似乎都不正确。正确的方法是什么? 问题答案: 请尝试以下方法: 要么 对于输出,请尝试以下操作:

  • 本文向大家介绍C#通过流写入一行数据到文件的方法,包括了C#通过流写入一行数据到文件的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#通过流写入一行数据到文件的方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的C#程序设计有所帮助。

  • 我有一个大问题,我自己创建了链表和数据结构,但数据读取功能工作非常慢。如果我尝试读取10k结构,函数需要大约530ms: 但是当我尝试读取10倍大的数据量(100k)时,大约需要44500毫秒: 这是我的代码: IQ_struct.h IQ_struct.cpp IQ_data.h IQ_data.cpp Main.cpp 我做错了什么?主要问题是我的文件包含超过5000K的结构。提前感谢您的帮助