当前位置: 首页 > 知识库问答 >
问题:

谷歌云数据流--从PubSub到Parquet

楚茂实
2023-03-14

我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。

共有1个答案

焦同
2023-03-14

为了进一步对社会作出贡献,我将我们的讨论总结为一个答案。

既然您是从Dataflow开始的,我可以指出一些有用的主题和建议:

>

  • Apache Beam中的PTransform WriteToParquet()builtin方法非常有用。它从记录的PCollection写入Parquet文件。此外,为了使用它并写入parquet文件,您需要按照文档中的指示指定模式。此外,本文还将帮助您更好地理解如何使用该方法,以及如何在Google云存储(GCS)桶中编写该方法。

    我鼓励你阅读上面的链接。然后,如果你有任何其他问题,你可以张贴另一个线程,以便获得更具体的帮助。

  •  类似资料:
    • 我需要从压缩的GCS文件中解析json数据,因为文件扩展名是。gz,所以它应该由dataflow正确地重新组织和处理,但是作业日志打印出不可读的字符和未处理的数据。当我处理未压缩的数据时,它工作得很好。我使用以下方法映射/解析JSON: 你知道原因是什么吗? 运行时的配置: 输入文件名示例:file.gz,命令gsutil ls-l gs://bucket/input/file.gz grep c

    • 问题内容: 建筑: 我们有一个使用2个pubsub主题/订阅对的架构: 定期由cronjob触发主题(例如,每5分钟触发一次)。订阅是我们云功能的触发器。 主题充当我们的一项服务发布的后台作业的队列。云功能在每次执行时读取订阅,以为排队的后台作业提供服务。 这使我们可以控制后台作业的服务频率,而与将它们添加到队列的时间无关。 云功能(由触发)通过pull读取消息。它决定准备好哪些后台作业,并在成功

    • 当我运行Dataflow作业时,它会将我的小程序包(setup.py或requirements.txt)上传到Dataflow实例上运行。 但是数据流实例上实际运行的是什么?我最近收到了一个stacktrace: 但从理论上讲,如果我在做,这意味着我可能没有运行这个Python补丁?你能指出这些作业正在运行的docker图像吗,这样我就可以知道我使用的是哪一版本的Python,并确保我没有在这里找

    • 我需要使用信任存储在谷歌云数据流中建立SSLKafka连接。我可以从存储桶提供它,还是有没有办法将其存储在“本地文件系统”上?

    • 我想使用Cloud Dataflow,PubSub和Bigquery将tableRow写入PubSub消息,然后将它们写入Bigquery。我希望表名、项目id和数据集id是动态的。 我在internet上看到下面的代码,我不明白如何传递数据行参数。 先谢谢你,盖尔

    • 我正在使用dataflow处理存储在GCS中的文件,并写入Bigquery表。以下是我的要求: 输入文件包含events记录,每个记录属于一个EventType; 需要按EventType对记录进行分区; 对于每个eventType输出/写入记录到相应的Bigquery表,每个eventType一个表。 每个批处理输入文件中的事件各不相同; 我正在考虑应用诸如“GroupByKey”和“Parti