当前位置: 首页 > 知识库问答 >
问题:

是否可以将Pub/Sub和BigQuery作为Google数据流中的输入?

施轶
2023-03-14

在我的项目中,我希望在Google Dataflow中使用流式管道来处理发布/订阅消息。在清理输入数据时,我还希望从BigQuery获得一个侧面输入。这会导致两个输入中的一个不工作。

我在流媒体的Pipeline选项中设置了=True,这允许Pub/Sub输入正确处理。但是BigQuery与流媒体管道不兼容(见下面的链接):

https://cloud.google.com/dataflow/docs/resources/faq#what_are_the_current_limitations_of_streaming_mode

我收到了这个错误:“ValueError: Cloud Pub/Sub目前仅可用于流媒体管道。”基于限制,这是可以理解的。

但我只希望使用BigQuery作为辅助输入,以便将数据映射到传入的发布/订阅数据流。它在本地运行良好,但一旦我尝试在数据流上运行它,它就会返回错误。

有没有人找到一个很好的解决方法?

编辑:在下面添加我的管道框架以供参考:

# Set all options needed to properly run the pipeline
options = PipelineOptions(streaming=True,
                          runner='DataflowRunner', 
                          project=project_id)

p = beam.Pipeline(options = options)

n_tbl_src = (p
         | 'Nickname Table Read' >> beam.io.Read(beam.io.BigQuerySource(
            table = nickname_spec
        )))

# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
              | beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
                                     subscription = 'projects/{0}/subscriptions/{1}'
                                                  .format(project_id, subscription_name),
                                     with_attributes = True)
              | 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
              | 'Fix Value Types' >> beam.ParDo(FixTypesFn())
              | 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
              | 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=AsList(n_tbl_src))
              | 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))


# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
    table = bq_spec,
    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))

# Run the pipeline
p.run()

共有1个答案

濮阳
2023-03-14

@Pablo上面的评论是正确的答案。对于任何经历过同样情况的人来说,下面是我脚本中有效的更改。

# This opens the Beam pipeline to run Dataflow
p = beam.Pipeline(options = options)
logging.info('Created Dataflow pipeline.')

# This will pull in all of the recorded nicknames to compare to the incoming PubSubMessages.

client = bigquery.Client()
query_job = client.query("""
    select * from `{0}.{1}.{2}`""".format(project_id, dataset_id, nickname_table_id))
nickname_tbl = query_job.result()
nickname_tbl = [dict(row.items()) for row in nickname_tbl]

# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
              | beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
                                     subscription = 'projects/{0}/subscriptions/{1}'
                                                  .format(project_id, subscription_name),
                                     with_attributes = True)
              | 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
              | 'Fix Value Types' >> beam.ParDo(FixTypesFn())
              | 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
              | 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=nickname_tbl)
              | 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))


# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
    table = bq_spec,
    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))

# Run the pipeline
p.run()
 类似资料:
  • 我曾经使用过SpringCloudDataFlow、rabbitmq和kafka,但我想知道是否可以使用GooglePub/sub安装scdf。 我不想创建一个流(新的应用程序spring cloud stream),将源或接收器连接到gcp,我希望google pub/sub over spring cloud data flow server用作中间消息代理。 有什么建议吗?

  • 我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设

  • 我试图通过数据流模板“Pub/Sub Avro to Bigquery”,将数据从Pub/Sub流到Bigquery。发布/订阅中的数据采用AVRO格式,来自Kafka主题。我从架构注册表中获得的对应架构文件。它看起来是这样的: 保存的schema.avsc中没有换行符,我在数据流中收到这个错误: 当我手动使用主题中的消息时,我能够使用完全相同的模式进行解码,但是我需要处理消息前面的五个额外字节。

  • 如果我想发送消息到谷歌PubSub并使用它的消息。您建议使用Spring cloud GCP库还是只使用Google cloud Java API。 有人能区分这两者吗?或者与谷歌云pubsub库相比,Spring Cloud gcp提供了哪些功能。

  • null null null 我的问题是:如何使用自定义时间戳处理数据,并能够在使用Beam API定义的windows上操作?

  • 但理想情况下,我只希望有一些不处理插入错误并记录它的配置,而只是崩溃作业或至少停止摄入。