当前位置: 首页 > 知识库问答 >
问题:

如何使用“触发一次”触发器控制 Spark 结构化流式处理中每个触发器正在处理的文件量?

满雨石
2023-03-14

我正在尝试使用Spark结构化流的功能,触发一次,来模拟一个类似的批处理设置。然而,当我运行我的初始批处理时,我遇到了一些麻烦,因为我有很多历史数据,因此我也使用了这个选项。选项(" cloud files . includeexistingfiles "," true ")也处理现有文件。

因此,我的初始批处理变得非常大,因为我无法控制批处理的文件量。

我也尝试过使用选项 cloudFiles.maxBytes触发器,但是,当您使用触发器一次时,这被忽略 -

当我指定最大文件触发器选项时,它也被忽略。它只接受所有可用的文件。

我的代码如下所示:

df = (
  spark.readStream.format("cloudFiles")
    .schema(schemaAsStruct)
    .option("cloudFiles.format", sourceFormat)
    .option("delimiter", delimiter)
    .option("header", sourceFirstRowIsHeader)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("badRecordsPath", badRecordsPath)
    .option("maxFilesPerTrigger", 1)
    .option("cloudFiles.resourceGroup", omitted)
    .option("cloudFiles.region", omitted)
    .option("cloudFiles.connectionString", omitted)
    .option("cloudFiles.subscriptionId", omitted)
    .option("cloudFiles.tenantId", omitted)
    .option("cloudFiles.clientId", omitted)
    .option("cloudFiles.clientSecret", omitted)
    .load(sourceBasePath)
)

# Traceability columns
df = (
  df.withColumn(sourceFilenameColumnName, input_file_name()) 
    .withColumn(processedTimestampColumnName, lit(processedTimestamp))
    .withColumn(batchIdColumnName, lit(batchId))
)

def process_batch(batchDF, id):
  batchDF.persist()
  
  (batchDF
     .write
     .format(destinationFormat)
     .mode("append")
     .save(destinationBasePath + processedTimestampColumnName + "=" +  processedTimestamp)
  )
    
  (batchDF
   .groupBy(sourceFilenameColumnName, processedTimestampColumnName)
   .count()
   .write
   .format(destinationFormat)
   .mode("append")
   .save(batchSourceFilenamesTmpDir))
  
  batchDF.unpersist()

stream = (
  df.writeStream
    .foreachBatch(process_batch)
    .trigger(once=True)
    .option("checkpointLocation", checkpointPath)
    .start()
)

如你所见,我使用的是cloudfiles格式,这是Databricks自动加载器的格式

“Auto Loader在新数据文件到达云存储时增量高效地处理这些文件。

自动加载程序提供了一个名为 cloud 文件的结构化流式处理源。给定云文件存储上的输入目录路径,cloudFiles 源会在新文件到达时自动处理它们,还可以选择处理该目录中的现有文件”

如果我以某种方式以令人困惑的方式提出我的问题,或者它缺乏信息,请说出来。

共有1个答案

云昊阳
2023-03-14

不幸的是,火花 3.x (DBR

要解决这个问题,您可以定期检查< code > stream . get(' numInputRows ')的值,如果它在一段时间内等于0,则发出< code>stream.stop()

更新,2021 年 10 月:看起来它将在 Spark 3.3 中通过引入新的触发器类型 - Trigger.AvailableNow 进行修复(请参阅 SPARK-36533)

 类似资料:
  • 我们有一个quartz应用程序可以在其他环境中很好地工作。 触发器表也显示它从未被激发。 NEXT_FIRE_TIME PREV_FIRE_TIME优先级TRIGGER_STATE TRIGGER_TYPE START_TIME END_TIME CALENDAR_NAME MISFIRE_INSTR 1405630189133-1 5等待CRON 1405624813000 0 140 5630

  • 我有一个 Blob 存储容器,其中配置了事件网格触发器(Blob 已创建)。我正在通过数据工厂加载此 blob 存储文件,很多时候,许多文件可能会在一次尝试中出现在此 blob 中。也许我们可以举一个20个文件的例子。 好消息是我的事件网格触发器启动了,函数app被调用。然而,我发现有时对于同一个文件,事件网格触发器被触发了不止一次。 在这20个文件中,很少有文件非常大,比如300 MB,但其他文

  • 我想用Python脚本自动执行一些任务。基本应用程序是用 Java 编写的。触发我使用的Python内容 所以起初我只是尝试在python脚本上创建一个目录: 如果我将新目录的路径作为绝对路径传递,例如 它工作正常 如果我通过它相对像 它不起作用。(如果我直接在python脚本中输入相对路径的字符串,它就会工作。) 如果任务从Java应用程序开始,日志文件log.txt永远不会写入。 如果我只是用

  • 我正在尝试将连续触发器与 Spark 结构化流式处理查询结合使用。我得到的错误是,火花消费者在处理数据时找不到适当的偏移量。如果没有此触发器,查询将正常运行(如预期)。 我的工作: 从Kafka主题读取数据: 将数据写入Kafka主题: 所以我基本上没有做什么特别的事情——只是将输入数据传输到输出主题,而没有任何转换或无效操作。 我得到了什么: 在executor日志中,我看到很多这样的消息: 尽

  • 我有一张如下的桌子 数据将从AWS Lambda函数一致地插入到此表中。(可能是数百万个项目) 我有一个用例,当表中有100个可用项时,我需要有一个触发器来执行一些批处理。换句话说,一旦我们在这个表中创建了100个新项目,我希望有一个Lambda触发器函数来对100个项目执行批处理。 当我研究时,DynamoDB流似乎可以支持批处理,但根据文档我不太清楚。 Lambda分批读取记录,并调用函数处理

  • 我只是想到了我解决每24小时运行一个任务的服务的方式,以及DST如何可能伤害它。 为了每天运行任务,我使用了System.线程。一个周期为24小时的计时器,像这样: 突然想到夏令时修正,我有三个想法: DST是无用的,我们应该把它扔掉 计时器处理得正确吗?我认为不管时钟是否改变,它都不会等待24小时。它只是等待指定的时间段,然后再次调用TimerCallback DST是无用的,我们真的应该摆脱它