当前位置: 首页 > 知识库问答 >
问题:

数据流批处理作业步骤完成后运行函数

葛鸿轩
2023-03-14

我知道我可以用云函数和PubSub通知来完成每个写入的文件,但我更喜欢只在整个文件夹完成时这样做一次。

谢了!

共有1个答案

苏宏逸
2023-03-14

有两种方法可以做到这一点:

运行管道,并在管道结果上调用waituntilfinish(Python中的wait_until_finish)将执行延迟到管道完成之后,如下所示:

pipeline.run().waituntilfinish();

您可以根据WaitIntilFinish的结果验证管道是否成功完成,然后从那里将文件夹的内容加载到BigQuery。这种方法唯一需要注意的是,您的代码不是数据流管道的一部分,因此如果您依赖管道中的元素来完成该步骤,那么这将更加困难。

Write转换的结果是一个WriteFilesResult,它允许您通过调用GetPerdestinationOutputFileNames来获取包含所写文件的所有文件名的PCollection。从那里,您可以继续使用可以将所有这些文件写入BigQuery的转换管道。下面是Java中的一个示例:

WriteFilesResult<DestinationT> result = files.apply(FileIO.write()...)
result.getPerDestinationOutputFilenames().apply(...)

Python中的等价物似乎称为fileResult,但我找不到这方面的好文档。

 类似资料:
  • 我正在尝试在Spring批处理中并行运行多个作业。在谷歌上搜索了很多之后,我遇到了JobStep。有没有人使用过JobStep可以解释如何使用它来并行运行作业,或者有没有其他方法可以并行运行2个独立的作业,即当我启动批处理时,2个作业应该开始并行运行。我的要求就像 当我的应用程序启动时,两个作业都应该开始运行。使用spring batch是否可以这样做 编辑:我甚至试过这种方法 我面临着例外。sp

  • 你可以点击 “设置任务计划”来为一个批处理作业设置计划和点击 “删除任务计划”来移除计划。 如果你在“常规”选项卡选择“不管用户是否登录都要运行”,当你保存计划时你必须在 Windows 计划程序提供你的操作系统用户密码。 【注意】请在设置计划之前保存批处理作业。在运行计划之前,在连接窗口内的密码必须保存。

  • 添加工作到批处理作业 在“常规”选项卡的底部窗格中,选择工作类型,然后如有需要浏览连接、数据库和/或模式以找出你想运行的工作。 你可以双击或拖放工作来将工作从“可用的工作”列表移动到“已选择的工作”列表。若要从已选择的工作列表删除工作,请以相同的方式移除它们。你可以在一个批处理作业中运行来自不同服务器的配置文件。 若要重新排序工作的序列,可使用 “上移”或 “下移”按钮。 如果你想备份整个服务器,

  • 你可以点击 来为一个批处理作业设置计划和点击 来移除计划。 “小时”和“分钟”字段必需指定。如果字段没有任何值,将会使用全部值。例如,如果“星期”字段是空的,系统会视为这个字段被输入“0, 1, 2, 3, 4, 5, 6”。使用逗号以分隔值。例如,0, 1, 3, 6。使用连字号 (-),不含空格以指示值。例如,0-4。 例子:批处理作业将会在每个工作日下午 6:30 运行。 【注意】请在设置计

  • 添加工作到批处理作业 在“常规”选项卡的底部窗格中,选择工作类型,然后如有需要浏览连接、数据库和/或模式以找出你想运行的工作。 你可以点击 或 来将已选择的工作或所有工作从“可用的工作”列表移动到“已选择的工作”列表。若要从已选择的工作列表删除已选择工作或所有工作,请点击 或 。你可以在一个批处理作业中运行来自不同服务器的配置文件。 若要重新排序工作的序列,可在已选择的工作列表中拖拉工作到所需的位

  • 你可以点击 “设置任务计划”来为一个批处理作业设置计划和点击 “删除任务计划”来移除计划。 “小时”和“分钟”字段必需指定。如果字段沒有任何值,将会使用全部值。例如,如果“星期”字段是空的,系统会视为这个字段被输入“0, 1, 2, 3, 4, 5, 6”。使用逗号以分隔值。例如,0, 1, 3, 6。使用连字号 (-),不含空格以指示值。例如,0-4。 例子:批处理作业将会在每个工作日下午 6: