我有一个从BigQuery获取数据并将其写入GCS的管道,但是,如果我发现任何拒绝,我希望将它们正确地添加到一个BigQuery表中。我将拒绝收集到一个全局列表变量中,然后将列表加载到BigQuery表中。当我在本地运行它时,这个过程工作得很好,因为管道以正确的顺序运行。当我使用dataflowrunner运行它时,它不能保证顺序(我希望pipeline1在Pipeline2之前运行。有没有一种方法可以使用Python在Dataflow中使用依赖的管道?或者也请建议是否可以用更好的方法解决这个问题。提前谢谢。
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(lambda x: somefunction) # Collecting rejects in the except block of this method to a global list variable
....etc
| 'to gcs' >> beam.io.WriteToText(output)
)
# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
rejects = (pipeline2
| 'create pipeline' >> beam.Create(reject_list)
| 'to json format' >> beam.Map(lambda data: {.....})
| 'to bq' >> beam.io.WriteToBigQuery(....)
)
您可以这样做,但只需要1个管道和转换中的一些附加代码。
beam.map(lambda x:somefunction)
应该有两个输出:写入GCS的输出,以及最终写入BigQuery的被拒绝的元素。
为此,转换函数必须返回taggedoutput
。
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
那么transform
函数将如下所示
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)
我试图执行下面的代码,但它在管道中抛出未处理的promise拒绝警告。在本地,它工作正常,没有任何问题。 日志错误:找不到进程“3224”。(节点:836)未处理的PromisejectionWarning:#(节点:836)未处理的PromisejectionWarning:未处理的promise拒绝。此错误源于在没有catch块的情况下抛出异步函数的内部,或者拒绝使用未处理的promise。c
我正在尝试使用动态ChannelHandler管道实现Netty 4. X。正如人们建议的“出于性能考虑,在运行时使用调用而不是管道修改”,我实现了一个Server、一个RouterInoundHander和一个Client来测试这个理论。但它不起作用。这是我的代码 计算机网络服务器 RouterInboundHandler 和客户 如代码所示,在Channel的连接初始化阶段创建了Channel
问题内容: 有谁知道bash如何通过管道发送数据? 此命令是否将file.txt的所有内容打印到缓冲区中,然后由tail读取?还是说,此命令是逐行打印file.txt的内容,然后在每一行停顿以进行尾部处理,然后请求更多数据? 我问的原因是我要在嵌入式设备上编写程序,该程序基本上对某些数据块执行一系列操作,其中一个操作的输出作为下一个操作的输入发出。我想知道linux(bash)是如何处理的,因此请
我试图通过“网络在行动”这本书来掌握网络概念。 在我看来,有几个概念解释得不太好或太模糊。因此,我想我会来这里就这些话题做一些明确的解释。 渠道管道: 所以我有一个这样的渠道管道: 对于channelInitializer,从概念上讲,我会假设该过程将按以下顺序进行:
作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。 我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续? 我认识到,我可以使用选项跳过列数不一致的记录,该选项几乎就足够了。问题
当玩家点选菜单上的「系统 - 返回之前」后,可以返回上一个段落,通过记录就是指定这种自动存储位置的功能。 要使用通过记录功能,需要用到 record 指令、此外还要对 Config.tjs 的 recordHistoryOfStore 进行设定。 「系统 - 返回之前」这一菜单选项默认是不显示的,但是可以在 Config.tjs 中将 goBackMenuItem.visible 设定为 t