当前位置: 首页 > 面试题库 >

Google Cloud Dataflow Python,检索作业ID

阎令
2023-03-14
问题内容

我目前正在使用Python处理数据流 模板 ,我想访问作业ID并将其保存到特定的Firestore文档。

是否可以访问作业ID?

我在文档中找不到与此有关的任何内容。


问题答案:

您可以通过dataflow.projects().locations().jobs().list在管道中进行调用来实现(请参见下面的完整代码)。一种可能性是始终使用相同的作业名称来调用模板,这很有意义,否则可以将作业前缀作为运行时参数传递。使用正则表达式解析作业列表,以查看该作业是否包含名称前缀,如果包含名称前缀,则返回该作业ID。如果有多个,它将​​仅返回最新的一个(当前正在运行的一个)。

在定义PROJECTBUCKET变量之后,使用以下命令暂存该模板:

python script.py \
    --runner DataflowRunner \
    --project $PROJECT \
    --staging_location gs://$BUCKET/staging \
    --temp_location gs://$BUCKET/temp \
    --template_location gs://$BUCKET/templates/retrieve_job_id

然后,myjobprefix在执行模板化作业时指定所需的作业名称(在我的情况下):

gcloud dataflow jobs run myjobprefix \
   --gcs-location gs://$BUCKET/templates/retrieve_job_id

retrieve_job_id函数将从作业中返回作业ID,将更job_prefix改为与给定名称匹配。

import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def retrieve_job_id(element):
  project = 'PROJECT_ID'
  job_prefix = "myjobprefix"
  location = 'us-central1'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | 'Start' >> beam.Create(["Init pipeline"])
               | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))

  p.run()


if __name__ == '__main__':
  run()


 类似资料:
  • 直接从HDFS读取文件,而不将其复制到本地文件系统。不过,我将结果复制到本地文件系统。 hduser@ubuntu:/usr/local/hadoop$mkdir/tmp/gutenberg-output bin/hadoop dfs-getmerge/user/hduser/gutenberg-output/tmp/gutenberg-output deprecated:不推荐使用此脚本执行hd

  • 我是使用AWS Glue的新手,我不明白ETL作业是如何收集数据的。我使用爬虫从S3存储桶中的一些文件生成我的表模式,并检查了ETL作业中的自动生成脚本,如下所示(略有修改): 当我运行此作业时,它成功地从我的爬虫用于生成表模式的存储桶中获取我的数据,并按预期将数据放入我的目标s3存储桶中。 我的问题是:可以说,我在这个脚本中看不到任何地方“加载”了数据。我知道我把它指向了爬虫程序生成的表,但从这

  • 问题内容: 如果我知道作业ID,有什么方法可以检索作业配置(配置中的某些属性)? 基本上,我正在做的是检查当前是否有任何正在运行的作业,然后我要检查当前正在运行的任何作业中是否存在某些属性值? 用于检索当前正在运行的作业的部分代码: 问题答案: 您可以在作业跟踪器中查看正在运行的作业的配置,该配置通常在端口50030上运行。

  • 我正在用异步JobLauncher在Spring Batch中配置一个(长时间运行的)作业,我有两个RESTendpoint: null 谢谢朱利奥

  • 我正在使用Apache Flink RabbitMQ堆栈。我知道有机会手动触发保存点并从中还原作业,但问题是Flink会在成功的检查点之后确认消息,如果要使保存点和还原状态为,则会丢失上次成功的保存点和上次成功的检查点之间的所有数据。有没有办法从检查点恢复作业?这将解决在不可重放数据源(如rabbitmq)的情况下丢失数据的问题。顺便说一句,如果我们有检查点及其所有开销,为什么不让用户使用它们呢?

  • 我想检查Quartz作业是否正在运行。我发现它可以使用调度程序。getCurrentlyExecutingJobs()。但是我对此感到困惑,我应该把它放在哪里才能得到结果呢?谢谢