GCP数据流状态完成/完成后,是否有任何方式进行后处理。我有一个过程,数据流从GCP存储中批量读取一个文件,并执行一些外部api调用以进行转换,然后写回另一个文件。在所有批次转换/处理后,我需要做一些额外的处理。有办法吗?我正在使用ApacheBeam和模板来运行GCPDatatflow。
对有两个选项可供选择:
PipelineResult.waitUntilFinish()
,然后运行其他代码。我知道我可以用云函数和PubSub通知来完成每个写入的文件,但我更喜欢只在整个文件夹完成时这样做一次。 谢了!
长话短说,我有一个cron工作,每天在指定的时间将一堆文件上传到云存储桶中。所有这些bucket都有一个关联的发布/订阅通知主题,该主题在文件创建事件时触发。每个事件都会触发一个数据流作业来处理该文件。 问题是这会在几秒钟内实例化100个并行批处理作业。每个作业都会使用HTTP请求来关闭我的下游服务。这些服务无法足够快地扩展,并开始抛出连接拒绝错误。 为了限制这些请求,我限制了每个数据流作业的可用
我每天都在python dataflow工作中得到这个错误。 我使用的是Apache Beam2.15(与2.17一样)Python 3.7。 2020-01-28 17:08:53.801来自工作人员的GM恐怖消息:处理在步骤s03中停留了至少10m00s,没有在sun.misc.unsafe.park(本机方法)在java.util.concurrent.locks.locksupport.p
我正在尝试在GCP数据流中运行批处理作业。工作本身有时会占用大量内存。目前,工作一直在崩溃,因为我相信每个工作人员都在试图同时运行pcollection的多个元素。有没有办法防止每个工人一次运行多个元素?
我正在尝试找出是否有任何GCP数据流模板可用于使用“Pub/Sub to Cloud Spanner”进行数据摄取。我发现已经有一个默认的GCP数据流模板可用于示例-“Cloud Pub/Sub to BigQuery”。所以,我有兴趣看看我是否可以在流或批处理模式下对扳手进行数据摄取,以及行为会如何
我们正在尝试使用GCP数据流和Python作业模板连接到Oracle数据库。当我们使用无法访问Internet的特殊子网来运行数据流作业时,我们使用setup.py.从GCS存储桶安装依赖包 下面是使用 setup.py 创建数据流模板的命令行: < code>python3 -m 依赖项包存储在 GCP 存储桶中,并将复制到数据流工作线程,并在作业运行时安装在数据流工作线程上。对于 Oracle