管道每 20 分钟运行一次,以 ORC 格式将数据推送到 ADLS Gen2 存储。
event_readstream = (
spark.readStream
.format("orc")
.schema('my-schema.json')
.load('/mnt/path/from/adls/to/orc/files/')
)
...
def upsertEventToDeltaTable(df, batch_id):
input_df = (
df
.drop('address_line3')
.dropDuplicates(['primaryKey'])
.withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
)
input_df.createOrReplaceTempView("event_df_updates")
input_df._jdf.sparkSession().sql("""
MERGE INTO events dbEvent
USING event_df_updates dfEvent
ON dbEvent.primaryKey = dfEvent.primaryKey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
event_writestream = (
event_readStream
.drop('adddress_line3') #some column to be dropped
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
.foreachBatch(upsertEventToDeltaTable)
.start()
)
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
location_writestream = (
event_readstream # Same read stream is used here
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation)
.start()
)
问题:
在上述第2点中,不使用读取流(从orc文件中读取),而是使用如下的增量表路径创建一个新的读取流
deltatbl_event_readstream = spark.readStream.format("delta")
.load("/mnt/delta/myadlsaccnt/user_events") # my delta table location
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
deltatbl_event_readstream
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation) # re-using the same method.
.start()
我遇到了链接DATA AI峰会,其中有针对这种场景的演示。
在我的案例中,每一批都有
下面是类似于Alex Ott的回答,添加了其他信息
根据建议,如果批量更新更多,CDF可能无效。
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
table_changes()
函数查看更改%sql
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
event_read_stream = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("event") #// table name
.filter("_change_type != 'update_preimage'")
event_read_stream.writeStream.format("delta")
.trigger(processingTime = "2 seconds") # if in case if job use once
.outputMode("append")
.start()
如果您只在增量表上使用纯 readStream
,而没有任何选项,则不会获得有关更新的信息。实际上,流将在更新后失败,直到您设置选项忽略更改
。这是因为Delta不跟踪更改,当您进行更新/删除时,它会重写现有文件,因此通过查看文件,您只能看到数据,而不知道它是插入还是更新。
但是,如果您需要从增量流式传输更改,则可以使用增量 8.4 中引入的增量更改数据馈送 (CDF) 功能(如果我没记错的话)。若要使其正常工作,需要通过将属性 delta.enableChangeDataFeed
设置为 true
,在源增量表上启用它。从该版本开始,您将能够阅读更改的源,如下所示:
deltatbl_event_readstream = spark.readStream.format("delta")\
.option("readChangeFeed", "true") \
.option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
.load("...")
这将添加三个额外的列来描述所执行的操作、Delta的版本和时间戳。如果只需要跟踪更改,只需要选择< code>_change_type列的值为< code > update _ posimage 的行,然后就可以将数据存储在任何需要的位置。
但请注意,在表上启用CDF后,其他客户端(DBR
我正在探索DataBricks Delta表及其时间旅行/时间特性。我有一些过去发生的事件数据。我正在尝试将它们插入delta表,并能够使用数据中的时间戳而不是实际的插入时间进行时间旅行。 我的事件中有一个日期/时间列。我将其重命名为“时间戳”,但它仍然不起作用。 我的 csv 数据如下所示:(数据显示 id=1000 的单个案例发生了 5 次更新) 我使用这些命令来创建增量表: 我有两个问题:
问题内容: 这是一个有点奇怪的请求,但我正在寻找一种方法来将列表写入文件,然后再读回去。 我没有办法重新制作列表,以使它们如下面的示例所示正确地形成/格式化。 我的列表具有如下数据: 问题答案: 如果您不需要它是人类可读/可编辑的,则最简单的解决方案是使用。 来写: 读书: 如果您 确实 需要使它们易于阅读,则我们需要更多信息。 如果保证是没有嵌入换行符的字符串列表,则只需每行写一个: 如果它们是
本文向大家介绍C#通过流写入数据到文件的方法,包括了C#通过流写入数据到文件的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#通过流写入数据到文件的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的C#程序设计有所帮助。
问题内容: 我正在尝试读写文件,但是我想通过Resource访问该文件。 这就是我要做的 但似乎都不正确。正确的方法是什么? 问题答案: 请尝试以下方法: 要么 对于输出,请尝试以下操作:
本文向大家介绍C#通过流写入一行数据到文件的方法,包括了C#通过流写入一行数据到文件的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#通过流写入一行数据到文件的方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的C#程序设计有所帮助。
我有一个大问题,我自己创建了链表和数据结构,但数据读取功能工作非常慢。如果我尝试读取10k结构,函数需要大约530ms: 但是当我尝试读取10倍大的数据量(100k)时,大约需要44500毫秒: 这是我的代码: IQ_struct.h IQ_struct.cpp IQ_data.h IQ_data.cpp Main.cpp 我做错了什么?主要问题是我的文件包含超过5000K的结构。提前感谢您的帮助