我一直在运行基于12月创建的模板的数据流作业,该模板在运行时传递一些参数,没有任何问题。我现在不得不对模板做了一些修改,我似乎在生成一个工作模板时遇到了问题,即使使用和以前一样的beam代码/版本。我的工作只是无限期地挂起-尝试离开一个,大约一个小时后超时。
当然有一个问题,因为即使是我创建空PCollection的第一步也没有成功,它只是说运行。
我已经从函数中抽象出来,以解决问题可能是什么,因为日志中没有错误或奇怪之处。在非常精简的管道下面共享,一旦我注释掉管道中使用值提供程序参数的第二行和第三行,作业就会成功(创建一个空的PCollection)。
我使用的“add_value_provider_argument”非常接近此处的官方代码片段:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L554和https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#using-函数中的valueprovider
我从帕布罗这里借来的:https://stackoverflow.com/a/58327762/5687904
我甚至尝试在一个新的虚拟机中构建一个全新的环境,认为我的环境可能有一些东西破坏了模板而没有构建失败。
我尝试了 Dataflow SDK 2.15.0,这是原始模板以及 2.24.0(最新的模板)使用的内容。
真的很感激任何关于调试这个的想法,因为我开始绝望了。
import logging
import pandas as pd
import argparse
import datetime
#================ Apache beam ======================
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import fileio
import io
#======================
PROJECT_ID = 'my-project'
GCS_STAGING_LOCATION = 'gs://my-bucket//gcs_staging_location/'
GCS_TMP_LOCATION = 'gs://my-bucket/gcs_tmp_location/'
#======================================
# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
def __init__(self, files_bucket):
self.files_bucket = files_bucket
def process(self, element):
files = pd.read_csv(str(element), header=None).values[0].tolist()
bucket = self.files_bucket.get()
files = [str(bucket) + '/' + file for file in files]
logging.info('Files list is: {}'.format(files))
return files
#=========================================================
# https://stackoverflow.com/questions/58240058/ways-of-using-value-provider-parameter-in-python-apache-beam
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp):
self.vp = vp
def process(self, unused_elm):
yield self.vp.get()
#=========================================================
class RuntimeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--files_bucket',
help='Bucket where the raw files are',
type=str)
parser.add_value_provider_argument(
'--complete_batch',
help='Text file with filenames in it location',
type=str)
parser.add_value_provider_argument(
'--comp_table',
required=False,
help='BQ table to write to (dataset.table)',
type=str)
#=========================================================
def run():
#====================================
# TODO PUT AS PARAMETERS
#====================================
dt_now = datetime.datetime.now().strftime('%Y%m%d')
job_name = 'dataflow-test-{}'.format(dt_now)
pipeline_options_batch = PipelineOptions()
runtime_options = pipeline_options_batch.view_as(RuntimeOptions)
setup_options = pipeline_options_batch.view_as(SetupOptions)
setup_options.setup_file = './setup.py'
google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'
pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
pipeline_options_batch.view_as(SetupOptions).save_main_session = True
pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:
try:
final_data = (
pipeline_2
|'Create empty PCollection' >> beam.Create([None])
|'Get accepted batch file'>> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
# |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
)
except Exception as exception:
logging.error(exception)
pass
#=========================================================
if __name__ == "__main__":
run()
当您创建模板时,使用的Apache Beam SDK似乎与setup.py文件中的包版本前向兼容,并且工作正常;但是,当您执行更新时,SDK版本可能无法与setup.py.中列出的相同版本前向兼容
基于这个文档,Apache Beam SDK和Dataflow workers必须具有向前兼容的库,以避免版本冲突,这可能导致服务中的意外行为。
为了了解每个Apache Beam SDK版本中所需的包版本,请看一下这个页面。
我已经开始使用Scala SDK Scio开发我的第一个DataFlow工作。数据流作业将以流模式运行。 有谁能建议最好的部署方法吗?我已经在Scio文档中阅读了他们使用的,然后将其部署到Docker容器中。我也读过关于使用数据流模板的内容(但不是很详细)。 什么是最好的?
当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?
我正在尝试使用airflow的DataflowPythonOperator计划数据流作业。这是我的dag操作员: gcp_conn_id已设置,可以正常工作。错误显示数据流失败,返回代码为1。完整日志如下所示。 gcp_dataflow_hook.py似乎有问题,除了这个没有更多的信息。有没有办法解决这个问题,有没有DataflowPython算子的任何例子?)到目前为止,我找不到任何使用案例)
使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:
在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?