当前位置: 首页 > 知识库问答 >
问题:

从Google云存储流数据流到Big Query

宁浩博
2023-03-14

我正在尝试使用DataFlow(Java)将数据从云存储插入到Big Query中。我可以批量上传数据;但是,我想要设置一个流式上传代替。因此,当新对象添加到我的bucket时,它们将被推送到BigQuery。

我已经将PipelineOptions设置为流,并且在GCP控制台UI中显示dataflow管道是流类型的。bucket中的初始文件/对象集被推送到BigQuery。

但是当我向桶中添加新对象时,这些对象不会被推送到BigQuery。这是为什么?我如何使用蒸汽数据流管道将添加到我的云存储中的对象推送到BigQuery?

//Specify PipelineOptions
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);


  options.setProject(<project-name>);
  options.setStagingLocation(<bucket/staging folder>);    
  options.setStreaming(true);
  options.setRunner(DataflowRunner.class);

我的解释是,因为这是一个流式管道,当我向云存储添加对象时,它们将被推送到BigQuery。

请建议。

共有1个答案

万俟穆冉
2023-03-14

如何创建输入集合?您需要有一个无界输入,以便流管道保持打开状态,否则它将只是临时的(但将使用流插入)。您可以通过从包含桶中所有更改的订阅中读取内容来实现这一点,请参阅https://cloud.google.com/storage/docs/pubsub-notifications了解详细信息。

 类似资料:
  • 我们将Google Cloud Datastore用于Google App Engine(GAE)应用程序。在我们的项目改造期间,我们希望将数据库从Datastore迁移到Google CloudSQL。 我们在数据存储中约有1 TB数据,不包括索引。 如何从数据存储迁移到云SQL,是否有任何现有的开源解决方案可用于此。 我已经检查了下面的一个https://cloud.google.com/da

  • 这两个URL之间有区别吗?一个直接指向mp4,然后另一个URL是“下载链接”?有区别吗? 在谷歌云平台中有这样存储文件的选项吗?

  • 我需要从Google Cloud Storage(GCS->Temp Table->Main table)中加载100个表到BigQuery。我创建了一个python进程,将数据加载到BigQuery中,并在AppEngine中进行调度。因为AppEngine最多有10min的超时时间。我已经在异步模式下提交了作业,并在稍后的时间点检查了作业状态。由于我有100个表,需要创建一个监控系统来检查作业

  • 我试图将一个用编写的函数迁移到 实时解压缩并逐行读取 对每一行执行一些光转换 将未压缩的输出(一次一行或块)写入GCS 输出是>2GB,但略小于3GB,所以它适合。 null AFAIC,我将坚持,因为输出可以放入内存--就目前而言--但是多部分上传是以最少的内存支持任何输出大小的方法。 想法还是替代方案?

  • 有人能帮我做这个吗?

  • 这是可行的,但这里的问题是,在流回此方法的客户端之前,它必须首先缓冲所有字节。这会导致很多延迟,尤其是当存储在GCS中的文件很大时。 是否有一种方法可以从GCS获取文件并将其直接流到OutputStream,这里的OutputStream是针对servlet的。