我试图用Beam sdk Version2.20.0在Python 3.7中构建一个Apache Beam管道,该管道成功地部署在Dataflow上,但似乎没有做任何事情。在工作日志中,我可以看到重复报告的以下错误消息
同步pod xxxxxxxxxxxx()时出错,跳过:启动容器工作日志失败
我已经尝试了我能尝试的一切,但这个错误是相当顽固的,我的管道看起来像这样。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import DebugOptions
options = PipelineOptions()
options.view_as(GoogleCloudOptions).project = PROJECT
options.view_as(GoogleCloudOptions).job_name = job_name
options.view_as(GoogleCloudOptions).region = region
options.view_as(GoogleCloudOptions).staging_location = staging_location
options.view_as(GoogleCloudOptions).temp_location = temp_location
options.view_as(WorkerOptions).zone = zone
options.view_as(WorkerOptions).network = network
options.view_as(WorkerOptions).subnetwork = sub_network
options.view_as(WorkerOptions).use_public_ips = False
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
options.view_as(SetupOptions).sdk_location = ''
options.view_as(SetupOptions).save_main_session = True
options.view_as(DebugOptions).experiments = []
print('running pipeline...')
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic=topic_name).with_output_types(bytes)
| 'ProcessMessage' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_table_name,
schema=bq_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = pipeline.run()
我尝试使用sdk_location参数从compute实例提供一个beam sdk 2.20.0.tar.gz,这也不起作用。我不能使用sdk_location=default,因为这会触发从pypi.org下载。我在一个离线环境中工作,连接到互联网不是一个选择。如有任何帮助,不胜感激。
管道本身部署在容器上,apache Beam2.20.0附带的所有库都在requirements.txt文件中指定,docker映像安装所有库。
TL;DR:将Apache Beam SDK归档复制到一个可访问的路径中,并将该路径作为变量提供。
我也在纠结这个设置。最后我找到了一个解决方案--即使你的问题是在几天前提出的,这个答案可能会帮助其他人。
可能有多种方法可以做到这一点,但以下两种方法相当简单。
创建所需beam_sdk版本的tar.gz源存档,如下所示:
python setup.py sdist
现在您应该在路径beam/sdks/python/dist/
中有源代码归档apache-beam-2.28.0.tar.gz
选项1-使用Flex模板并在DockerFile
文档中复制Apache_Beam_SDK:Google Dataflow文档
副本utils/apache-beam-2.28.0.tar.gz/tmp
,因为这将是您可以在setupoptions中设置的路径。FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
# Due to a change in the Apache Beam base image in version 2.24, you must to install
# libffi-dev manually as a dependency. For more information:
# https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
# update used packages
RUN apt-get update && apt-get install -y \
libffi-dev \
&& rm -rf /var/lib/apt/lists/*
COPY setup.py .
COPY main.py .
COPY path_to_beam_archive/apache-beam-2.28.0.tar.gz /tmp
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
RUN python -m pip install --user --upgrade pip setuptools wheel
options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
gcloud dataflow flex-template build "gs://define-path-to-your-templates/your-flex-template-name.json" \
--image=gcr.io/your-project-id/image-name:tag \
--sdk-language=PYTHON \
--metadata-file=metadata.json
gcloud dataflow flex-template run "your-dataflow-job-name" \
--template-file-gcs-location="gs://define-path-to-your-templates/your-flex-template-name.json" \
--parameters staging_location="gs://your-bucket-path/staging/" \
--parameters temp_location="gs://your-bucket-path/temp/" \
--service-account-email="your-restricted-sa-dataflow@your-project-id.iam.gserviceaccount.com" \
--region="yourRegion" \
--max-workers=6 \
--subnetwork="https://www.googleapis.com/compute/v1/projects/your-project-id/regions/your-region/subnetworks/your-subnetwork" \
--disable-public-ips
选项2-Copy sdk_location from GCS
根据Beam文档,您甚至可以为选项sdk_location
直接提供一个GCS/gs://path,但它对我不起作用。但以下措施应该奏效:
gs://yourbucketname/beam_sdks/apache-beam-2.28.0.tar.gz
/tmp/apache-beam-2.28.0.tar.gz
# see: https://cloud.google.com/storage/docs/samples/storage-download-file
from google.cloud import storage
def download_blob(bucket_name, source_blob_name, destination_file_name):
"""Downloads a blob from the bucket."""
# bucket_name = "your-bucket-name"
# source_blob_name = "storage-object-name"
# destination_file_name = "local/path/to/file"
storage_client = storage.Client()
bucket = storage_client.bucket("gs://your-bucket-name")
# Construct a client side representation of a blob.
# Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
# any content from Google Cloud Storage. As we don't need additional data,
# using `Bucket.blob` is preferred here.
blob = bucket.blob("gs://your-bucket-name/path/apache-beam-2.28.0.tar.gz")
blob.download_to_filename("/tmp/apache-beam-2.28.0.tar.gz")
options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
对于Oracle12.2,输出如下: JDBC驱动程序错误地自动检测字符集为UTF-8,但流实际上是在ISO8859-15中。在JDBC8中不能显式设置字符集。从数据库返回的流是在Oracle 12.1下用UTF-8编码的
严格的单向数据流是 Redux 架构的设计核心。 这意味着应用中所有的数据都遵循相同的生命周期,这样可以让应用变得更加可预测且容易理解。同时也鼓励做数据范式化,这样可以避免使用多个且独立的无法相互引用的重复数据。 如果这些理由还不足以令你信服,读一下 动机 和 Flux 案例,这里面有更加详细的单向数据流优势分析。虽然 Redux 不是严格意义上的 Flux,但它们有共同的设计思想。 Redux
有时,您希望发送非常巨量的数据到客户端,远远超过您可以保存在内存中的量。 在您实时地产生这些数据时,如何才能直接把他发送给客户端,而不需要在文件 系统中中转呢? 答案是生成器和 Direct Response。 基本使用 下面是一个简单的视图函数,这一视图函数实时生成大量的 CSV 数据, 这一技巧使用了一个内部函数,这一函数使用生成器来生成数据,并且 稍后激发这个生成器函数时,把返回值传递给一个
我正在从GCP中的Cloud Function触发数据流作业。 嵌入云功能的代码 当执行Cloud函数时,数据流作业确实会被触发,但作业一直失败。当我检查作业日志时,我看到这条错误消息- requirements.txt 如果我在安装apache beam[gcp]后直接从Cloud shell运行它,则嵌入云函数中的python代码运行良好。 请分享您对如何克服丢失模块的数据流错误的意见。 谢谢