我在Python中使用Apache Beam和Google Cloud Dataflow(2.3.0)。当将worker_machine_type
参数指定为例如n1-high mem-2
或Custal-1-6656
时,Dataflow运行作业,但始终为每个工作人员使用标准机器类型n1-ardard-1
。
有没有人知道我是否做错了什么?
其他主题(这里和这里)表明这应该是可能的,所以这可能是一个版本问题。
我用于指定PipelineOptions的代码(注意,所有其他选项都工作正常,因此它应该识别< code>worker_machine_type参数):
def get_cloud_pipeline_options(project):
options = {
'runner': 'DataflowRunner',
'job_name': ('converter-ml6-{}'.format(
datetime.now().strftime('%Y%m%d%H%M%S'))),
'staging_location': os.path.join(BUCKET, 'staging'),
'temp_location': os.path.join(BUCKET, 'tmp'),
'project': project,
'region': 'europe-west1',
'zone': 'europe-west1-d',
'autoscaling_algorithm': 'THROUGHPUT_BASED',
'save_main_session': True,
'setup_file': './setup.py',
'worker_machine_type': 'custom-1-6656',
'max_num_workers': 3,
}
return beam.pipeline.PipelineOptions(flags=[], **options)
def main(argv=None):
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_cloud_pipeline_options(args.project_id
pipeline = beam.Pipeline(options=pipeline_options)
在Apache Beam 2.8.0中对我有用的是通过将< code >-worker _ machine _ type 更改为< code> - machine_type(然后使用< code>machine_type作为参数的名称,如其他答案中所建议的)来更新这一行源代码。
这可以通过使用标志machine_type
而不是worker_machine_type
来解决。其余代码工作正常。
因此,文档中提到了错误的字段名称。
管道选项在
后台使用参数解析
其参数。对于计算机类型,参数的名称machine_type
但标志名称worker_machine_type
。这在以下两种情况下工作正常,其中 argparse 执行其解析并意识到此别名:
my_pipeline.py--worker_machine_typeCustal-1-6656
标志['--worker_machine_type','worker_machine_typeCustate-1-6656',…]
但是,它不适用于**kwargs
。以这种方式传递的任何其他参数都用于替换已知的参数名(但不是标志名)。
简而言之,使用<code>machine_type</code>可以在任何地方工作。我提出https://issues.apache.org/jira/browse/BEAM-4112以便将来将其固定在梁中。
我正在运行数据流作业从气流。我需要说我是气流的新手。数据流(从气流运行)正在成功运行,但我可以看到气流在获得工作状态时遇到了一些问题,我收到了无限的消息,比如: 谷歌云数据流作业尚不可用。。 以下是将所有步骤添加到数据流后的日志(我将{project ectID}和{jobID}放在它所在的位置): 你知道这是什么原因吗?我找不到与此问题相关的任何解决方案。我应该提供更多信息吗? 这是我在DAG中
storage和colab之间的链接如下所示:
我正在使用dataflow处理存储在GCS中的文件,并写入Bigquery表。以下是我的要求: 输入文件包含events记录,每个记录属于一个EventType; 需要按EventType对记录进行分区; 对于每个eventType输出/写入记录到相应的Bigquery表,每个eventType一个表。 每个批处理输入文件中的事件各不相同; 我正在考虑应用诸如“GroupByKey”和“Parti
我正在尝试使用Google云存储API,该API现已发布在App Engine网站的文档部分。文档指出,您必须将appengine服务帐户添加为API控制台中的团队成员。然而,我们在谷歌应用程序域中使用云存储,这只允许该域的用户作为团队成员添加。那么,不可能添加服务帐户(@appspot.gserviceaccount.com)吗?。有什么变通办法吗?
当我运行Dataflow作业时,它会将我的小程序包(setup.py或requirements.txt)上传到Dataflow实例上运行。 但是数据流实例上实际运行的是什么?我最近收到了一个stacktrace: 但从理论上讲,如果我在做,这意味着我可能没有运行这个Python补丁?你能指出这些作业正在运行的docker图像吗,这样我就可以知道我使用的是哪一版本的Python,并确保我没有在这里找
我已经使用Google云数据流SDK编写了一个流式管道,但我想在本地测试我的管道。我的管道从Google Pub/Sub获取输入数据。 是否可以使用DirectPipelineRunner(本地执行,而不是在Google云中)运行访问发布/订阅(pubsubIO)的作业? 我在以普通用户帐户登录时遇到权限问题。我是项目的所有者,我正在尝试访问发布/子主题。