当前位置: 首页 > 知识库问答 >
问题:

如何正确使用Dataflow/apachebeam wait_until_finish duration参数?

仉昱
2023-03-14

我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实现了运行函数如下:

def run():
    opts = PipelineOptions()
    user_options = opts.view_as(UserOptions)
    p = beam.Pipeline(options=opts)

    (p |
     "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,
                                                        use_standard_sql=StaticValueProvider(bool, True))) |
     "Get data" >> beam.ParDo(doStuff()) |

     "Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |

     "Write to BQ" >> beam.io.WriteToBigQuery(
                table=user_options.table_spec,
                schema=user_options.table_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
     )

    result = p.run()

    result.wait_until_finish(duration=1800000)

共有1个答案

东方俊材
2023-03-14

不,数据流不提供特定时间段后的自动取消。你仍然可以通过简单的取消来实现你的目标

    result.wait_until_finish(duration=1800000)
    if not result.is_in_terminal_state():   # if pipeline isn't finished, cancel
      result.cancel()
 类似资料:
  • 问题内容: 我只想检索UserAccount类中的某些列,所以我有以下代码: 我得到了空值作为回报。但是,如果我注释掉setProjections,我将获得具有所有属性的用户。在这种情况下,如何正确使用setProjection? 问题答案: 它返回一个Object数组,因此代码应为:

  • 问题内容: 我不知道我在哪里错了:/。当我运行这段代码时,我得到的只是一个空白元素。我似乎无法让insertRule方法执行任何操作(甚至不会产生错误)。我想念什么吗? 问题答案: 这有点令人困惑,但是您的代码确实可以工作,只是您看不到返回的XML树中插入的规则。 为了验证您的代码是否有效,您可以执行两个测试: 运行上面的代码片段,您可以看到CSS规则确实适用。并且属性也在控制台中更改。 当浏览器

  • 问题内容: 如何使用从类路径中查找递归资源? 例如 在“目录”中查找所有资源:想象一下 不幸的是,这只会检索到恰好该“目录”。 所有资源都已命名(递归) 但这返回一个空。 还有一个额外的问题:与有什么不同? 问题答案: 没有办法递归搜索类路径。您需要知道资源的完整路径名才能以这种方式检索它。该资源可能位于文件系统中的目录中,也可能位于jar文件中,因此它不像执行“类路径”的目录列表那样简单。您将需

  • 问题内容: 我最近开始使用ScriptManager。我有一个通过JavaScript填充的ASP.NET DropDownList控件。但是,我正在使用事件验证。因此,如果我不使用下拉菜单中的“ RegisterForEventValidation”调用,则会遇到以下错误。我怎么知道在第二个参数中设置什么值(我有“值”)?我正在通过JavaScript填充下拉列表,因此我不知道后面的代码中包含哪

  • 我正在尝试正确地使用ByteBuffer和BigEndian字节顺序格式。。 我有几个字段,我试图把它存储在Cassandra数据库之前放在一个单一的ByteBuffer中。 我将要写入Cassandra的字节数组由三个字节数组组成,如下所述- 现在,我将写,和一起到一个字节数组和由此产生的字节数组我将写入Cassandra,然后我将有我的C程序来检索它字节数组数据从Cassandra,然后反序列

  • 问题内容: 我在用 : Node.js Express 4.0 Passport.js Google OAuth 2身份验证 对于每个用户,我将其Google个人资料中的一些信息(电子邮件等…)存储在MySQL数据库中(我对此技术没有选择),访问和刷新令牌以及用户在提供信息时提供的其他信息他在我的应用上注册。 我已经看到了password.js的不同用法,特别是关于该信息在会话中的存储方式。 在p