当前位置: 首页 > 知识库问答 >
问题:

如何将watchfornewfiles与GCS源存储桶一起用于数据流?

章安宜
2023-03-14

参考项目:在Apache Beam中观察与文件夹匹配的新文件

您可以将其用于简单的用例吗?我的用例是我让用户将数据上传到Cloud Storage-

我想做的是保持管道在流式模式下运行,一旦文件上传到Cloud Storage,它就会通过管道进行处理。watchfornewfile可以做到这一点吗?

我编写的代码如下:

p.apply(TextIO.read().from("<bucketname>")         
    .watchForNewFiles(
        // Check for new files every 30 seconds         
        Duration.standardSeconds(30),                      
        // Never stop checking for new files
        Watch.Growth.<String>never()));

没有任何内容被转发到Big Query,但管道显示它正在流式传输。

共有1个答案

壤驷阳波
2023-03-14

你可以在这里使用Google云存储触发器:https://Cloud . Google . com/functions/docs/calling/Storage # functions-calling-Storage-python

这些触发器使用类似于Cloud Pub/Sub的Cloud Functions,如果对象是:创建/删除/存档/或元数据更改,则会在对象上触发。

这些事件是使用来自云存储的发布/订阅通知发送的,但注意不要在同一个bucket上设置许多函数,因为存在一些通知限制。

此外,在文档的结尾有一个示例实现的链接。

 类似资料:
  • 问题内容: 我正在使用Spring,Spring Data JPA,Spring Security,Primefaces的项目中工作… 在本教程中,你只能在预定义的数据源之间实现动态数据源切换。 这是我的代码的片段: springContext-jpa.xml 我想做的就是使targetDataSources映射也与其元素一样动态。 换句话说,我想获取某个数据库表,使用存储在该表中的属性创建我的数

  • 我在一个使用Spring、Spring数据JPA、Spring Security、Primefaces的项目中工作。。。 我正在学习关于spring动态数据源路由的教程。 在本教程中,您只能在预定义的数据源之间实现动态数据源切换。 以下是我的代码片段: springContext jpa。xml 我想做的是使targetDataSources映射与它的元素一样动态。 换句话说,我想获取一个特定的数

  • 我为我的spring boot应用程序开发集成测试,该应用程序与Cassandra一起工作。我使用与Cassandra通信。 有什么方法可以将Testcontainers Cassandra与Spring数据一起用于Apache Cassandra吗?

  • 我有一个名为UserRepository的通用存储库接口。然后我有一个接口,它从MyUserRepository扩展而来。它处理一个MyUser类,该类扩展了User。 我还有一个名为UserService的服务接口和一个名为MyUserServiceImpl的类。 该服务需要UserRepository的实例,我虽然可以使用某种注释,如@Qualifer,但它不起作用。 应用程序无法启动 说明:

  • 我有一张桌子,比如 as 希望将值聚合或将值条柱到 如何在SQL或更具体的spark sql中执行此操作? 目前我有一个侧视图,但这看起来相当笨拙/低效。 分位数离散化并不是我真正想要的,而是这个范围的。 https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache