当前位置: 首页 > 知识库问答 >
问题:

使用add_value_provider_argument时数据流作业挂起

孙志
2023-03-14

我一直在运行基于12月创建的模板的数据流作业,该模板在运行时传递一些参数,没有任何问题。我现在不得不对模板做了一些修改,我似乎在生成一个工作模板时遇到了问题,即使使用和以前一样的beam代码/版本。我的工作只是无限期地挂起-尝试离开一个,大约一个小时后超时。

当然有一个问题,因为即使是我创建空PCollection的第一步也没有成功,它只是说运行。

我已经从函数中抽象出来,以解决问题可能是什么,因为日志中没有错误或奇怪之处。在非常精简的管道下面共享,一旦我注释掉管道中使用值提供程序参数的第二行和第三行,作业就会成功(创建一个空的PCollection)。

我使用的“add_value_provider_argument”非常接近此处的官方代码片段:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L554和https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#using-函数中的valueprovider

我从帕布罗这里借来的:https://stackoverflow.com/a/58327762/5687904

我甚至尝试在一个新的虚拟机中构建一个全新的环境,认为我的环境可能有一些东西破坏了模板而没有构建失败。

我尝试了 Dataflow SDK 2.15.0,这是原始模板以及 2.24.0(最新的模板)使用的内容。

真的很感激任何关于调试这个的想法,因为我开始绝望了。

import logging
import pandas as pd
import argparse
import datetime
#================ Apache beam  ======================

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import fileio
import io

#======================
PROJECT_ID = 'my-project'
GCS_STAGING_LOCATION = 'gs://my-bucket//gcs_staging_location/'
GCS_TMP_LOCATION = 'gs://my-bucket/gcs_tmp_location/' 
#======================================

# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
    def __init__(self, files_bucket):
        self.files_bucket = files_bucket

    def process(self, element):
        files = pd.read_csv(str(element), header=None).values[0].tolist()
        bucket = self.files_bucket.get()
        files = [str(bucket) + '/' + file for file in files]
        logging.info('Files list is: {}'.format(files))
        return files

#=========================================================
# https://stackoverflow.com/questions/58240058/ways-of-using-value-provider-parameter-in-python-apache-beam   
class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp):
        self.vp = vp

    def process(self, unused_elm):
        yield self.vp.get()

#=========================================================
class RuntimeOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
          '--files_bucket',
          help='Bucket where the raw files are',
          type=str)
        
        parser.add_value_provider_argument(
          '--complete_batch',
          help='Text file with filenames in it location',
          type=str)

        parser.add_value_provider_argument(
          '--comp_table',
          required=False,
          help='BQ table to write to (dataset.table)',
          type=str)
#=========================================================

def run():
    #====================================
    # TODO PUT AS PARAMETERS 
    #====================================
    dt_now = datetime.datetime.now().strftime('%Y%m%d')

    job_name = 'dataflow-test-{}'.format(dt_now)

    pipeline_options_batch = PipelineOptions()
    runtime_options = pipeline_options_batch.view_as(RuntimeOptions)
    setup_options = pipeline_options_batch.view_as(SetupOptions)
    setup_options.setup_file  = './setup.py'
    google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
    google_cloud_options.project = PROJECT_ID
    google_cloud_options.staging_location = GCS_STAGING_LOCATION
    google_cloud_options.temp_location = GCS_TMP_LOCATION
    pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
    pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'
    pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
    pipeline_options_batch.view_as(SetupOptions).save_main_session = True
    pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']


    with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
     
        try:
            final_data = (
            pipeline_2
            |'Create empty PCollection' >> beam.Create([None])
            |'Get accepted batch file'>> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
            # |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
            )
        except Exception as exception:
            logging.error(exception)
            pass
#=========================================================

if __name__ == "__main__":
    run()

共有1个答案

轩辕晔
2023-03-14

当您创建模板时,使用的Apache Beam SDK似乎与setup.py文件中的包版本前向兼容,并且工作正常;但是,当您执行更新时,SDK版本可能无法与setup.py.中列出的相同版本前向兼容

基于这个文档,Apache Beam SDK和Dataflow workers必须具有向前兼容的库,以避免版本冲突,这可能导致服务中的意外行为。

为了了解每个Apache Beam SDK版本中所需的包版本,请看一下这个页面。

 类似资料:
  • 我已经开始使用Scala SDK Scio开发我的第一个DataFlow工作。数据流作业将以流模式运行。 有谁能建议最好的部署方法吗?我已经在Scio文档中阅读了他们使用的,然后将其部署到Docker容器中。我也读过关于使用数据流模板的内容(但不是很详细)。 什么是最好的?

  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?

  • 我正在尝试使用airflow的DataflowPythonOperator计划数据流作业。这是我的dag操作员: gcp_conn_id已设置,可以正常工作。错误显示数据流失败,返回代码为1。完整日志如下所示。 gcp_dataflow_hook.py似乎有问题,除了这个没有更多的信息。有没有办法解决这个问题,有没有DataflowPython算子的任何例子?)到目前为止,我找不到任何使用案例)

  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?