当前位置: 首页 > 知识库问答 >
问题:

从主题PubSub读取数据流并写入Bigquery(multiples tables)

佴阳曦
2023-03-14

有一个在Dataflow中使用过DynamicDestination的人,他有一个简单的描述示例。在git(https://github.com/googleCloudPlatform/dataflowTemplates/blob/master/src/main/Java/com/google/cloud/teleport/templates/dlpTextToBigQueryStreaming.)中看到示例teleport已经让我感到厌烦了,作为apache Beam的新手,我很受伤。顺便说一下,我需要做的是从Pubsub读取消息,并通过Dataflow作业写入BigQuery数据集中的不同目的地(表)。我有一个自定义项目,它非常适合Bigquery表,但Pubsub主题将包含来自同一数据集的多个目的地。此外,消息是JSON格式的,并且包含一个带有目标表名称的字段。

这是我最有代表性的代码

TopicToBigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TopicToBigQueryOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(Constants.READ_PUBSUB, PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
         .apply(Constants.LINE_TO_CHAMP, new PubSubToTableRowTransform())
         .apply(Constants.WRITE_CHAMPBAN, BigQueryIO.writeTableRows()
                .to(options.getTableStagingFileLines())
                .withSchema(AmplaChangeLogSchema.getTableSchema())
                .withCreateDisposition(CREATE_IF_NEEDED)
             .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

有什么建议吗?

最诚挚的问候

共有1个答案

祖利
2023-03-14

正如我在评论中提到的,原作者(@Ryan McDowell)解释了非常相同的用户场景,使用GCP pub/sub消息队列中的JSON有效负载,执行到Bigquery表的动态路由,从pub/sub消息中提取特定属性中的特定表名。

在示例的管道中,我们看到从DynamicDestinations类继承的GetTableDestination()方法,该方法用于从包含Bigquery表名的消息中提取特定属性(TableNameAttr),最后标识目标对象TableDestination()。

 类似资料:
  • 我想使用Cloud Dataflow,PubSub和Bigquery将tableRow写入PubSub消息,然后将它们写入Bigquery。我希望表名、项目id和数据集id是动态的。 我在internet上看到下面的代码,我不明白如何传递数据行参数。 先谢谢你,盖尔

  • 嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题 然后我得到以下错误 线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行; 然后,我对代码进行了如下编辑,以从Kafka中读取并写

  • 我试图用Cloud Dataflow(Beam Python SDK)将它读写到BigQuery。 读写2000万条记录(约80 MBs)几乎需要30分钟。 查看dataflow DAG,我可以看到将每个CSV行转换为BQ行花费了大部分时间。

  • 我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不

  • 下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc

  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错: