当前位置: 首页 > 知识库问答 >
问题:

通过相关管道处理DataFlow/Apache Beam中的拒绝

晋承运
2023-03-14

我有一个从BigQuery获取数据并将其写入GCS的管道,但是,如果我发现任何拒绝,我希望将它们正确地添加到一个BigQuery表中。我将拒绝收集到一个全局列表变量中,然后将列表加载到BigQuery表中。当我在本地运行它时,这个过程工作得很好,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在Pipeline2之前运行。有没有一种方法可以使用Python在Dataflow中使用依赖的管道?或者也请建议是否可以用更好的方法解决这个问题。提前谢谢。

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable
               ....etc
               | 'to gcs' >> beam.io.WriteToText(output)
               )

# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
    rejects = (pipeline2
                    | 'create pipeline' >> beam.Create(reject_list)
                    | 'to json format' >> beam.Map(lambda data: {.....})
                    | 'to bq' >> beam.io.WriteToBigQuery(....)
                    )

共有1个答案

丌官星渊
2023-03-14

您可以这样做,但只需要1个管道和转换中的一些附加代码。

beam.map(lambda x:somefunction)应该有两个输出:写入GCS的输出,以及最终写入BigQuery的被拒绝的元素。

为此,转换函数必须返回taggedoutput

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(transform)  # Tagged output produced here

    pcoll_to_gcs = data.gcs_output
    pcoll_to_bq  = data.rejected

    pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
    pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)

那么transform函数将如下所示

def transform(element):
  if something_is_wrong_with_element:
    yield pvalue.TaggedOutput('rejected', element)

  transformed_element = ....

  yield pvalue.TaggedOutput('gcs_output', transformed_element)
 类似资料:
  • 我试图执行下面的代码,但它在管道中抛出未处理的promise拒绝警告。在本地,它工作正常,没有任何问题。 日志错误:找不到进程“3224”。(节点:836)未处理的PromisejectionWarning:#(节点:836)未处理的PromisejectionWarning:未处理的promise拒绝。此错误源于在没有catch块的情况下抛出异步函数的内部,或者拒绝使用未处理的promise。c

  • 我正在尝试使用动态ChannelHandler管道实现Netty 4. X。正如人们建议的“出于性能考虑,在运行时使用调用而不是管道修改”,我实现了一个Server、一个RouterInoundHander和一个Client来测试这个理论。但它不起作用。这是我的代码 计算机网络服务器 RouterInboundHandler 和客户 如代码所示,在Channel的连接初始化阶段创建了Channel

  • 问题内容: 有谁知道bash如何通过管道发送数据? 此命令是否将file.txt的所有内容打印到缓冲区中,然后由tail读取?还是说,此命令是逐行打印file.txt的内容,然后在每一行停顿以进行尾部处理,然后请求更多数据? 我问的原因是我要在嵌入式设备上编写程序,该程序基本上对某些数据块执行一系列操作,其中一个操作的输出作为下一个操作的输入发出。我想知道linux(bash)是如何处理的,因此请

  • 我试图通过“网络在行动”这本书来掌握网络概念。 在我看来,有几个概念解释得不太好或太模糊。因此,我想我会来这里就这些话题做一些明确的解释。 渠道管道: 所以我有一个这样的渠道管道: 对于channelInitializer,从概念上讲,我会假设该过程将按以下顺序进行:

  •  当玩家点选菜单上的「系统 - 返回之前」后,可以返回上一个段落,通过记录就是指定这种自动存储位置的功能。  要使用通过记录功能,需要用到 record 指令、此外还要对 Config.tjs 的 recordHistoryOfStore 进行设定。  「系统 - 返回之前」这一菜单选项默认是不显示的,但是可以在 Config.tjs 中将 goBackMenuItem.visible 设定为 t

  • 作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。 我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续? 我认识到,我可以使用选项跳过列数不一致的记录,该选项几乎就足够了。问题