当前位置: 首页 > 知识库问答 >
问题:

Apache Beam SDK 2.20.0的数据流错误

孙永思
2023-03-14

我试图用Beam sdk Version2.20.0在Python 3.7中构建一个Apache Beam管道,该管道成功地部署在Dataflow上,但似乎没有做任何事情。在工作日志中,我可以看到重复报告的以下错误消息

同步pod xxxxxxxxxxxx()时出错,跳过:启动容器工作日志失败

我已经尝试了我能尝试的一切,但这个错误是相当顽固的,我的管道看起来像这样。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import DebugOptions

 options = PipelineOptions()

    options.view_as(GoogleCloudOptions).project = PROJECT
    options.view_as(GoogleCloudOptions).job_name = job_name
    options.view_as(GoogleCloudOptions).region = region
    options.view_as(GoogleCloudOptions).staging_location = staging_location
    options.view_as(GoogleCloudOptions).temp_location = temp_location

    options.view_as(WorkerOptions).zone = zone
    options.view_as(WorkerOptions).network = network
    options.view_as(WorkerOptions).subnetwork = sub_network
    options.view_as(WorkerOptions).use_public_ips = False

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    options.view_as(StandardOptions).streaming = True

    options.view_as(SetupOptions).sdk_location = ''
    options.view_as(SetupOptions).save_main_session = True

    options.view_as(DebugOptions).experiments = []

    print('running pipeline...')

    with beam.Pipeline(options=options) as pipeline:
        (
                pipeline
                | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic=topic_name).with_output_types(bytes)
                | 'ProcessMessage' >> beam.ParDo(Split())
                | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_table_name,
                                                               schema=bq_schema,
                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
         )

    result = pipeline.run()

我尝试使用sdk_location参数从compute实例提供一个beam sdk 2.20.0.tar.gz,这也不起作用。我不能使用sdk_location=default,因为这会触发从pypi.org下载。我在一个离线环境中工作,连接到互联网不是一个选择。如有任何帮助,不胜感激。

管道本身部署在容器上,apache Beam2.20.0附带的所有库都在requirements.txt文件中指定,docker映像安装所有库。

共有1个答案

滑乐逸
2023-03-14

TL;DR:将Apache Beam SDK归档复制到一个可访问的路径中,并将该路径作为变量提供。

我也在纠结这个设置。最后我找到了一个解决方案--即使你的问题是在几天前提出的,这个答案可能会帮助其他人。

可能有多种方法可以做到这一点,但以下两种方法相当简单。

创建所需beam_sdk版本的tar.gz源存档,如下所示:

python setup.py sdist 

现在您应该在路径beam/sdks/python/dist/中有源代码归档apache-beam-2.28.0.tar.gz

选项1-使用Flex模板并在DockerFile
文档中复制Apache_Beam_SDK:Google Dataflow文档

  1. 创建一个Dockerfile-->您必须包含这个副本utils/apache-beam-2.28.0.tar.gz/tmp,因为这将是您可以在setupoptions中设置的路径。
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}

WORKDIR ${WORKDIR}

# Due to a change in the Apache Beam base image in version 2.24, you must to install
# libffi-dev manually as a dependency. For more information:
# https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891

# update used packages
RUN apt-get update && apt-get install -y \
    libffi-dev \
 && rm -rf /var/lib/apt/lists/*


COPY setup.py .
COPY main.py .

COPY path_to_beam_archive/apache-beam-2.28.0.tar.gz /tmp

ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"

RUN python -m pip install --user --upgrade pip setuptools wheel
    options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
gcloud dataflow flex-template build "gs://define-path-to-your-templates/your-flex-template-name.json" \
 --image=gcr.io/your-project-id/image-name:tag \
 --sdk-language=PYTHON \
 --metadata-file=metadata.json
gcloud dataflow flex-template run "your-dataflow-job-name" \
--template-file-gcs-location="gs://define-path-to-your-templates/your-flex-template-name.json" \
--parameters staging_location="gs://your-bucket-path/staging/" \
--parameters temp_location="gs://your-bucket-path/temp/" \
--service-account-email="your-restricted-sa-dataflow@your-project-id.iam.gserviceaccount.com" \
--region="yourRegion" \
--max-workers=6 \
--subnetwork="https://www.googleapis.com/compute/v1/projects/your-project-id/regions/your-region/subnetworks/your-subnetwork" \
--disable-public-ips

选项2-Copy sdk_location from GCS
根据Beam文档,您甚至可以为选项sdk_location直接提供一个GCS/gs://path,但它对我不起作用。但以下措施应该奏效:

  1. 将以前生成的归档文件上传到一个bucket,您可以从希望执行的数据流作业中访问该bucket。可能是gs://yourbucketname/beam_sdks/apache-beam-2.28.0.tar.gz
  2. 将源代码中的apache-beam-sdk复制到例如。/tmp/apache-beam-2.28.0.tar.gz
# see: https://cloud.google.com/storage/docs/samples/storage-download-file
from google.cloud import storage

def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # bucket_name = "your-bucket-name"
    # source_blob_name = "storage-object-name"
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()
    bucket = storage_client.bucket("gs://your-bucket-name")

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob("gs://your-bucket-name/path/apache-beam-2.28.0.tar.gz")
    blob.download_to_filename("/tmp/apache-beam-2.28.0.tar.gz")

options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
 类似资料:
  • 对于Oracle12.2,输出如下: JDBC驱动程序错误地自动检测字符集为UTF-8,但流实际上是在ISO8859-15中。在JDBC8中不能显式设置字符集。从数据库返回的流是在Oracle 12.1下用UTF-8编码的

  • 严格的单向数据流是 Redux 架构的设计核心。 这意味着应用中所有的数据都遵循相同的生命周期,这样可以让应用变得更加可预测且容易理解。同时也鼓励做数据范式化,这样可以避免使用多个且独立的无法相互引用的重复数据。 如果这些理由还不足以令你信服,读一下 动机 和 Flux 案例,这里面有更加详细的单向数据流优势分析。虽然 Redux 不是严格意义上的 Flux,但它们有共同的设计思想。 Redux

  • 有时,您希望发送非常巨量的数据到客户端,远远超过您可以保存在内存中的量。 在您实时地产生这些数据时,如何才能直接把他发送给客户端,而不需要在文件 系统中中转呢? 答案是生成器和 Direct Response。 基本使用 下面是一个简单的视图函数,这一视图函数实时生成大量的 CSV 数据, 这一技巧使用了一个内部函数,这一函数使用生成器来生成数据,并且 稍后激发这个生成器函数时,把返回值传递给一个

  • 我正在从GCP中的Cloud Function触发数据流作业。 嵌入云功能的代码 当执行Cloud函数时,数据流作业确实会被触发,但作业一直失败。当我检查作业日志时,我看到这条错误消息- requirements.txt 如果我在安装apache beam[gcp]后直接从Cloud shell运行它,则嵌入云函数中的python代码运行良好。 请分享您对如何克服丢失模块的数据流错误的意见。 谢谢