当前位置: 首页 > 知识库问答 >
问题:

如何添加可追溯性列自动加载器 - adf集成?

强安和
2023-03-14

我正在使用 Azure 数据工厂将源数据复制到登陆区域 (adls gen2),然后使用自动加载程序加载到青铜色增量表中。一切都很完美,除了我无法将pipeline_name,runid和trigger_time派生为镶木地板文件中的派生列以及输入源表。

架构此处是使用实际源 sql Server 表架构构建的结构类型,它不涵盖 ADF 中的其他派生列。

sourceFilePath = 'wasbs://landing-zone@dlslandingzonedev.blob.core.windows.net/' \
   + domain_name + '/' + database_name + '/' \
   + schema_table_name.replace(database_name+'.','') + '/'
df = (spark
     .readStream
     .format("cloudFiles")
     .options(**cloudFile)
     .schema(schema)
     .option("rescueDataColumn", "_rescued_data")
     .load(sourceFilePath)
     )
# Traceability columns
# from pyspark.sql.functions import *
df = (
  df.withColumn("audit_fileName", input_file_name()) 
    .withColumn("audit_createdTimestamp", current_timestamp())
)

这是写流DF

streamQuery = (df
           .writeStream
           .format("delta")
           .outputMode("append")
           .trigger(once=True)
           .queryName(queryName)
           .option("checkpointLocation",checkpointLocation)
           .option("mergeSchema", "true")
           .start(tablePath)
          )

使用 mergeSchema True - 我期望流在写入增量格式时从数据工厂检测到 3 个额外的列。这是镶木地板的限制吗?我是否以 csv / json 格式读取数据?或者我必须添加派生列架构定义。

共有2个答案

毛德华
2023-03-14

我的经验是,当您在readStream中添加列时,模式演化已经完成。我使用的做法是用这些列为表添加种子,并让模式演化处理传入数据。

CREATE Table IF NOT EXISTS db.tbl
(
  file_path               String,
  created_timestamp       Timestamp,
  last_modified_timestamp Timestamp,
  created_userid          String,
  _rescued_data           String
)
USING DELTA...
赖星驰
2023-03-14

您可以在readStream命令中添加审核字段:

from pyspark.sql import functions as F

sourceFilePath = 'wasbs://landing-zone@dlslandingzonedev.blob.core.windows.net/' \
   + domain_name + '/' + database_name + '/' \
   + schema_table_name.replace(database_name+'.','') + '/'
df = (spark
     .readStream
     .format("cloudFiles")
     .options(**cloudFile)
     .schema(schema)
     .option("rescueDataColumn", "_rescued_data")
     .load(sourceFilePath)
     .withColumn("audit_fileName", input_file_name()) 
     .withColumn("audit_createdTimestamp", current_timestamp())
)
# Just displaying the dataframe with the audit columns:
df.display()
 类似资料:
  • 当用户从用户界面更改配置时,我想动态地重新加载log4j附加器(RollingFileAppender)。 我已经通过编程删除了追加器,并用新的配置值创建了新的追加器。在此之后,appender broked MaxBackupIndex和MaxFileSize无法正常工作。但是如果我更改了文件名(日志文件名),那么它可以正常工作。 能帮我解决这个问题吗?

  • 问题内容: 有没有一种方法可以修改.class文件,以便向某些方法添加Java注释?基本上,我想遍历jar文件中每个类文件的方法并注释某些方法。请注意,使用jar文件时,这不是在运行时。相反,完成后,我想使用注释修改类文件。 我确实可以访问源代码,因此,如果有一个自动的源代码修改器,那也可以工作… 我假设我需要Javassist或ASM之类的工具。如果是这样,我应该使用哪一个,我将如何处理? 问题

  • 问题内容: 我创建了一个带有JTextArea的JFrame。我想在每次追加后自动向下滚动文本区域。我应该如何处理? 我已经尝试过,但是什么都没有改变。 问题答案: 有两种方法(但必须将JTextArea放在JScrollPane中) a)设置插入符号(方法正确) 例如 b)从(从)移至最大值

  • 我有一个java应用程序,它有三个“形上说”的对象。。。1类动物,1类食物,这些与任何遗传或接口无关。。班级经理的最后一个任务是列出动物和食物的清单,经理负责动物园里的动物和食物。。 说到点子上。。。 我正在使用log4j,我需要登录到一个txt文件,如果并且仅当动物列表中的某些内容发生变化。。。(动物死了,出生了,或者什么的…)我需要登录系统。当且仅当食物清单中的某些东西发生变化时。。。(需要新

  • 我正在尝试通过代码添加一个自定义的附加器,该附加器应该记录一些包。所有的工作都使用以下代码: 所以简而言之..正如您所看到的,如果之前没有定义追加器,我正在创建一个追加器。然后我为org.test创建一个记录器(如果没有添加的话),并将appender添加到这个记录器中。 多谢了。