当前位置: 首页 > 知识库问答 >
问题:

云数据流、PubSub和Bigquery问题

蔡鹏程
2023-03-14

我想使用Cloud Dataflow,PubSub和Bigquery将tableRow写入PubSub消息,然后将它们写入Bigquery。我希望表名、项目id和数据集id是动态的。
我在internet上看到下面的代码,我不明白如何传递数据行参数。

public void PubSub(String projectId , String datasetId,String tableId,String topicId)       
    PipelineOptions options = PipelineOptionsFactory.create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setStreaming(true);
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    PCollection<TableRow> input = pipeline.apply(PubsubIO.Read.topic(createTopic(projectId,topicId).getName()).withCoder(TableRowJsonCoder.of()))
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));

    input.apply(BigQueryIO.Write.to(getTableReference(projectId,datasetId, tableId)).withSchema(getSchema()));

    pipeline.run();
}


private static TableReference getTableReference(String projectId , String datasetId,String tableId) {
      TableReference tableRef = new TableReference();
      tableRef.setProjectId(projectId);
      tableRef.setDatasetId(datasetId);
      tableRef.setTableId(tableId);
      return tableRef;
}

先谢谢你,盖尔

共有1个答案

夏侯宏旷
2023-03-14

BigQueryIo.Write转换不支持动态输出。但是您可以直接从DOFN进行BigQuery API调用。

这样,您就可以根据代码的计算,将表名设置为任何您想要的名称。这可以从侧面输入中查找,或者直接从DoFn当前正在处理的元素中计算。

为了避免对BigQuery进行太多的小调用,可以使用finishBundle()对请求进行批处理;

我不完全理解您是否要将Dataflow写到pub/sub,然后将pub/sub写到bigquery?您可以直接写入BigQuery而不使用pub/sub。

 类似资料:
  • 我试图用Cloud Dataflow(Beam Python SDK)将它读写到BigQuery。 读写2000万条记录(约80 MBs)几乎需要30分钟。 查看dataflow DAG,我可以看到将每个CSV行转换为BQ行花费了大部分时间。

  • 我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。

  • 由于我刚接触DataFlow/Beam,概念还不太清楚(或者至少我在开始编写代码时有困难),我有很多问题: 什么是最好的模板或模式,我可以用来做到这一点?我应该先执行BigQuery的PTransform(然后执行PubSub的PTransform)还是先执行PubSub的PTransform? 我怎么做加入?比如? PubSub的最佳窗口设置是什么?BigQuery的PTransform部分的窗

  • 有一个在Dataflow中使用过DynamicDestination的人,他有一个简单的描述示例。在git(https://github.com/googleCloudPlatform/dataflowTemplates/blob/master/src/main/Java/com/google/cloud/teleport/templates/dlpTextToBigQueryStreaming.

  • 我正在尝试设置我的开发环境。我一直在使用pubsub模拟器进行开发和测试,而不是在生产中使用谷歌云pubsub。为此,我设置了以下环境变量: 这适用于python google pubsub库,但当我切换到使用java apache beam进行google数据流时,管道仍然指向生产google pubsub。管道上是否有需要设置的设置、环境变量或方法,以便管道读取本地pubsub仿真器?

  • 如何使用带有DataflowRunner的apache光束从Google BigQuery数据集获取表列表? 我找不到如何从指定的数据集中获取表。我想使用数据流的并行处理编程模型将表从位于美国的数据集迁移到位于欧盟的数据集。