当前位置: 首页 > 知识库问答 >
问题:

Google Cloud Dataflow自定义模板-仅在流媒体管道中

闽承望
2023-03-14

我正在尝试为Google的数据流创建一个自定义模板。我只想将一些消息从Pubsub打印到控制台。当我尝试暂存我的模板时,我收到一个错误,即Cloud Pub/Sub仅适用于流式传输管道,而我的管道旨在成为流式传输管道:x。我在做什么使我的管道批处理而不是流式传输?

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrintExample(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--welcome', type=str)


TOPIC = ...
PROJECT = ...
BUCKET = ...


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1'
)

with beam.Pipeline(options=pipeline_options) as p:
    options = pipeline_options.view_as(PrintExample)
    (
        p
        | "Extract PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC)
        | "Print" >> beam.Map(print)
    )
    p.run()

然后我就跑

python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate

导致:

ValueError: Cloud Pub/Sub is currently available for use only in streaming pipelines.

共有1个答案

穆英飙
2023-03-14

你就快到了。只需将--stream添加到您的命令中。

python -m PrintTemplate.py 
    --runner DataflowRunner --project [PROJECT] 
    --staging_location gs://[BUCKET]/staging 
    --temp_location gs://[BUCKET]/temp 
    --template_location gs://[BUCKET]/templates/PrintTemplate
    --streaming

我看到您正在使用PipelineOptions。您还可以传递流=True。

pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project=PROJECT,
    job_name='printtemplate01',
    temp_location='gs://{}/temp'.format(BUCKET),
    region='us-central1',
    streaming=True
)
 类似资料:
  • 使装饰器包含具有name属性的管道元数据。 此值将用于在模板表达式中调用此管道。 它必须是有效的JavaScript标识符。 实现PipeTransform接口的transform方法。 此方法接受管道的值和任何类型的可变数量的参数,并返回一个变换的(“管道”)值。 import { Component } from '@angular/core'; selector: 'app-root',

  • 即。在货币管道上完成一些额外的格式化。为此,我想在自定义管道的组件代码中使用现有管道。

  • 我为媒体添加了一个自定义分类法,它在媒体管理部分显示为一个文本字段。我想这是典型的复选框格式,因为它存在于自定义帖子类型管理页面。有没有一种方法可以在函数中覆盖此选项,使此自定义分类显示在复选框中,以便用户可以轻松选择属于特定分类条目的图像? 下面是我用来将分类法引入媒体库的代码: 我找到了这篇文章,但从未使用过过滤器,如何让它为我工作有点困惑: https://wordpress.stackex

  • 问题内容: 我有一个使用Bootstrap中的标记的表单,如下所示: 这里有很多样板代码,我想简化为一个新的指令-form-input,如下所示: 产生: 我通过一个简单的模板完成了很多工作。 但是,当我开始添加更多高级功能时,我就陷入了困境。 如何在模板中支持默认值? 我想在指令中将“ type”参数公开为可选属性,例如: 但是,如果未指定任何内容,则默认为。我该如何支持? 如何根据属性的存在/

  • Angular 2还具有创建自定义管道的功能。 定义自定义管道的一般方法如下。 import { Pipe, PipeTransform } from '@angular/core'; @Pipe({name: 'Pipename'}) export class Pipeclass implements PipeTransform { transform(parameters): r