当前位置: 首页 > 知识库问答 >
问题:

AWS胶水-从作业内部访问工作流参数

姜淇
2023-03-14

如何从胶水作业中检索胶水工作流参数?

我有一个“Python Shell”类型的AWS胶水作业,它从胶水工作流中定期触发。

该作业的代码将在大量不同的工作流中重用,因此我希望检索工作流参数,以消除对冗余作业的需求。

AWS开发人员指南提供了以下教程:https://docs.AWS.amazon.com/glue/latest/dg/workflow-run-properties-code.html

但是我没有成功地使html" target="_blank">示例代码在不触发错误的情况下执行。我怀疑这个例子可能只适用于Scala/pyspark作业,而不适用于python shell作业。

我在相关作业中尝试了以下代码

import sys
import boto3
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
workflow_params = glue_client.get_workflow_run_properties(Name=workflow_name,
                                    RunId=workflow_run_id)["RunProperties"]

print(workflow_name, workflow_run_id, workflow_params)

当我按需触发工作流时,我会收到以下错误消息:

> Traceback (most recent call last):
> File "/tmp/runscript.py", line 115, in <module>
> runpy.run_path(temp_file_path, run_name='__main__')
> File "/usr/local/lib/python3.6/runpy.py", line 263, in run_path
> pkg_name=pkg_name, script_name=fname)
> File "/usr/local/lib/python3.6/runpy.py", line 96, in _run_module_code
> mod_name, mod_spec, pkg_name, script_name)
> File "/usr/local/lib/python3.6/runpy.py", line 85, in _run_code
> exec(code, run_globals)
> File "/tmp/glue-python-scripts-w4fbwl3n/map_etl_python_shell_test_env.py", line 10, in <module>
> File "/glue/lib/awsglue/utils.py", line 10, in getResolvedOptions
> parsed, extra = parser.parse_known_args(args)
> File "/usr/local/lib/python3.6/argparse.py", line 1766, in parse_known_args
> namespace, args = self._parse_known_args(args, namespace)
> File "/usr/local/lib/python3.6/argparse.py", line 2001, in _parse_known_args
', '.join(required_actions))
> File "/usr/local/lib/python3.6/argparse.py", line 2393, in error
> self.exit(2, _('%(prog)s: error: %(message)s\n') % args)
> File "/usr/local/lib/python3.6/argparse.py", line 2380, in exit
> _sys.exit(status)
> SystemExit: 2
> 
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
> File "/tmp/runscript.py", line 134, in <module>
> raise e_type(e_value).with_tracsback(new_stack)
> AttributeError: 'SystemExit' object has no attribute 'with_tracsback'

共有1个答案

邹德泽
2023-03-14

code>boto3库为您提供了一个有趣的功能

glue = boto3.client(service_name='glue', region_name="my-region")
job = glue.get_job(JobName="my-job-name")

default_parameters = job['Job']['DefaultArguments']
default_parameters[u'--my-parameter']

通过这种方式,您应该能够通过default_parameter操作胶水作业参数。我不确定它能直接在Glue作业中工作,但外部脚本应该能够处理Glue作业参数。

 类似资料:
  • 我正在使用AWS Glue爬行器来爬行大约170 GB的avro数据,以创建一个数据目录表。 avro数据中有几个不同的模式版本,但爬虫程序仍然能够将数据合并到一个表中(我启用了“按数据兼容性和模式相似性分组-模式”)。 这就是事情出现问题的时候。 我只能使用雅典娜从 一个简短的Google检查让我相信这与avro文件中的模式有关。 通常,这是我集中精力的地方,但是:我已经能够做完全相同的程序(A

  • 我每天都有csv文件被传递到S3,这些文件在当月是增量的。所以file1包含第1天的数据,file2包含第1天和第2天的数据,等等。每天我都想对该数据运行一个ETL并将其写入不同的S3位置,这样我就可以使用Athena查询它,而不会出现重复的行。本质上,我只想查询聚合数据的最新状态(这只是最近交付给S3的文件的内容)。 我认为书签不会起作用,因为增量交付包含以前文件中的数据,因此会产生重复。我知道

  • 我正在尝试使用AWS胶水将大约1.5 GB的Gzip CSV转换为拼花地板。下面的脚本是自动生成的粘合作业,用于完成该任务。这似乎需要很长时间(我已经等了10个dpu好几个小时了,从来没有看到它结束或产生任何输出数据) 我想知道是否有人有任何经验将1.5 GB GZIPPED CSV转换为镶木地板-是否有更好的方法来完成此转换? 我有TB的数据要转换。值得关注的是,转换GBs似乎需要很长时间。 我

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

  • 我有一个数据帧。我需要将每个记录转换为JSON,然后使用JSON负载调用API将数据插入postgress。我在数据框中有14000条记录,要调用api并获得响应,需要5个小时。有没有办法提高性能。下面是我的代码片段。 注意:我知道通过做"json_insert=df_insert.toJSON()。收集()"我正在失去数据帧的优势。有没有更好的方法来完成。

  • 我有以下Spring批处理作业配置: 我用以下代码开始这项工作: 如何从作业步骤访问参数?