当前位置: 首页 > 知识库问答 >
问题:

如何从数据流批量(高效)发布到pub/sub?

黄宏旷
2023-03-14

我想发布消息到一个发布/子主题与一些属性感谢数据流作业在批处理模式。

它与@ankur解决方案一起工作:https://stackoverflow.com/a/55824287/9455637

但我认为使用共享的pub/sub客户端会更有效:https://stackoverflow.com/a/55833997/9455637

但是发生了一个错误:

    null
import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from google.cloud.pubsub_v1 import PublisherClient

import json
import argparse
import re
import logging


class PubsubClient(PublisherClient):
    def __reduce__(self):
        return self.__class__, (self.batch_settings,)


# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PubsubClient(batch_settings)
        super().__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(
            topic=element["topic"],
            data=json.dumps(element["data"]).encode("utf-8"),
            **element["attributes"],
        )

        return future.result()


def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--source_table_id",
        dest="source_table_id",
        default="",
        help="BigQuery source table <project>.<dataset>.<table> with columns (topic, attributes, data)",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).

    pipeline_options = PipelineOptions(pipeline_args)
    # pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    bq_source_table = known_args.source_table_id
    bq_table_regex = r"^(?P<PROJECT_ID>[a-zA-Z0-9_-]*)[\.|\:](?P<DATASET_ID>[a-zA-Z0-9_]*)\.(?P<TABLE_ID>[a-zA-Z0-9_-]*)$"

    regex_match = re.search(bq_table_regex, bq_source_table)

    if not regex_match:
        raise ValueError(
            f"Bad BigQuery table id : `{bq_source_table}` please match {bq_table_regex}"
        )

    table_ref = bigquery.TableReference(
        projectId=regex_match.group("PROJECT_ID"),
        datasetId=regex_match.group("DATASET_ID"),
        tableId=regex_match.group("TABLE_ID"),
    )

    with beam.Pipeline(options=pipeline_options) as p:

        (
            p
            | "ReadFromBqTable" # 
            >> bigquery.ReadFromBigQuery(table=table_ref, use_json_exports=True) # Each row contains : topic / attributes / data
            | "PublishRowsToPubSub" >> beam.ParDo(PublishFn())
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

暂时还没有答案

 类似资料:
  • 我的管道是IoTCore->pub/sub->Dataflow->bigQuery。最初,我得到的数据是Json格式的,管道工作正常。现在我需要转向csv,问题是我使用的Google定义的数据流模板使用Json输入而不是csv。是否有一种简单的方法通过数据流将csv数据从pub/sub转移到bigquery。模板可能会改变,但它是用Java实现的,我从来没有用过,所以需要很长时间来实现。我还考虑过

  • 问题内容: 我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。 我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。 这是到目前为止我所

  • 为了良好地沟通,你必须认识到它的困难。它本身就是一种技能。与你交流的人本身是有瑕疵的,这一事实使得沟通变得更加困难。他们不会努力去理解你。他们不善言辞。他们经常过度工作或者无聊,至少,有时候只关注他们自己的工作而非你要发表的长篇大论。上课,练习写作,公共演讲,聆听,这些东西的一个好处是,如果你擅长它们,你可以更容易看到问题所在以及解决方法。 程序员是一种社会动物,他们的生存依赖于与团队的交流。高级

  • 我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设

  • 我有一个在< code>postgresql数据库上使用< code>typeorm的更新查询,如下所示,该查询频繁地在20个项目的列表上执行(每30秒一次)。大约需要。更新12秒,对我的极限来说已经很多了。 是否有可能在单个查询中执行这样的批量更新,而不是迭代其他项?如果是的话-怎么做? 和对于每个项目都是唯一的。

  • 问题内容: 我已经开始尝试使用Django REST框架。到目前为止,我已经成功地为我的对象创建了一个序列化程序,通过Javascript的$ .post()创建了发布视图,发布对象和返回对象。因此,现在我可以在JSON和Django模型对象之间进行适当的转换。 问题是我有一个对象数组[A1,A2,…,An]。现在,当我需要发布这样的数组时,我逐个对象地进行处理。是否有可能一次发布整个数组,并在D