当前位置: 首页 > 知识库问答 >
问题:

基于标记输出数的动态分叉梁(数据流)管道

诸葛绍元
2023-03-14

我目前正试图根据数据中包含的特定键,将运行在Google Dataflow上的Beam管道分叉到多个目的地。当使用TaggedOutput标记对“fork”的每个endpoint进行硬编码时,我能够实现这一点。但是,在将来,我并不总是知道底层数据中存在哪些键,因此我希望使用类似于以下的for循环动态创建流程中的后续步骤:

p = beam.Pipeline(options=pipeline_options)

pipe = p | 'ReadFromGCS' >> ReadFromText(args['input']) \
         | 'TagAllLines' >> beam.ParDo(produce_tagged_output_keys).with_outputs()

for client in pipe:
  client = pipe.client | client+'MapLinesToDicts' >> beam.Map(lambda line: dict(record=line)) \
                         | client+'WriteToBQTable' >> WriteToBigQuery(client+'_test', dataset=bq_dataset, project=project_id, schema='record:string')

我的理解是,<代码>的结果。with\u outputs()应该是可编辑的,不是吗?当我运行上述命令时,它会执行管道,而不会出现问题,但会完全忽略for循环。有没有一种方法可以动态地做到这一点,而我却没有?

共有1个答案

戎鹏云
2023-03-14

的结果。with\u outputs()仅在指定标记时才可使用

的结果。不指定标记的with\u outputs()是不可使用的,因为在管道构建时输出还未知
的结果。使用\u输出(tag1,tag2,main=tag0)但是可以:
在此处测试差异

请记住,使用apache束,您首先构建管道,然后让数据流过它。因此,您不能让您的管道架构在运行时依赖于数据。

不过,我会提出另一种解决方案:

假设您有一个pcollection集合,其元素的结构如下:

(key, bq_row)

其中,键是决定要写入哪个表的功能,以及要写入bigquery行的数据的bq\u行。

然后,您可以按键对元素进行分组:

grouped = collection | beam.GroupByKey()

现在,每个不同的键都有一个元素。因此,一个元素中的所有bq_row都应该写入同一个大查询表,这取决于元素键。

然后可以定义一个DoFn,它将每个元素中的所有行写入相应的表。(示例代码)

class WriteToBigQueryFn(beam.DoFn):

    def __init__(self, dataset_name):
        super(BigQueryWriter, self).__init__()
        self.client = bigquery.Client()
        self.dataset = client.dataset(dataset_name)

    def process(self, (key, data)):
        table_name = get_table_name(key) # get table name based on key
        table = self.dataset.table(table_name)
        table.reload(self.client)

        table.insert_data(data)

grouped | beam.ParDo(WriteToBigQueryFn)
 类似资料:
  • Serverless 适合用于事件驱动型应用,以及定时任务。今天,让我们来看看一个事件驱动的例子。 在之前的那篇《Serverless 应用开发指南:CRON 定时执行 Lambda 任务》中,我们介绍了如何调度的示例。 最初我想的是通过 Lambda + DynamoDB 来自定义数据格式,后来发现使用 Kinesis Streams 是一种更简单的方案。 Amazon Kinesis Stre

  • 我有许多未分区的大型BigQuery表和文件,我希望以各种方式对它们进行分区。因此,我决定尝试编写一个数据流作业来实现这一点。我认为这工作很简单。我尝试使用泛型编写,以便轻松地应用TextIO和BigQueryIO源代码。它在小型表上工作得很好,但在大型表上运行时,我总是得到。 在我的主类中,我要么读取一个带有目标键的文件(由另一个DF作业生成),要么对一个BigQuery表运行一个查询,以获得要

  • 长话短说:java.io包中有多少种基于数据流的流?它们是字节流和字符流还是二进制流和字符流? 完整问题: https://youtu.be/v1_ATyL4CNQ?t=20m5s昨天看了本教程后跳到20:05,我的印象是基于数据流有两种类型的流:BinaryStreams和CharacterStreams。今天,在了解了更多关于这个主题的知识之后,我的新发现似乎与旧发现相矛盾。 互联网上的大多数

  • 问题内容: 我正在尝试编写查询以创建数据的“表”,如下所示: 这将返回类似: 现在,我希望结果是这样的(来自MySQL查询,而不是由应用程序操纵): 有两个注意事项: 日期范围可以增加或缩小(取决于查询) 在某些情况下,PercentChange可能为null(假设category_7 / 2011-05-12可能没有设置值) 所以最终我不太确定如何构建查询的选择部分以反映动态的列数(我知道它与C

  • 下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。

  • 我必须创建一个需要与Neo4J数据库交互的Web服务,使用Spring框架和Spring-Data-Neo4J。这需要静态数据域模型,例如定义的标签、关系、属性。 问题是,我的数据是基于本体论(通过neosemantics插件),将来可以修改。如果应用程序能够自动采用它,那就太好了。这样,数据模型可以只通过编辑本体论来扩展,不需要额外的编程知识。 这是否意味着我必须动态地生成Spring数据类(基