当前位置: 首页 > 知识库问答 >
问题:

谷歌云数据流中的动态分区?

都才俊
2023-03-14

我正在使用dataflow处理存储在GCS中的文件,并写入Bigquery表。以下是我的要求:

  1. 输入文件包含events记录,每个记录属于一个EventType;
  2. 需要按EventType对记录进行分区;
  3. 对于每个eventType输出/写入记录到相应的Bigquery表,每个eventType一个表。
  4. 每个批处理输入文件中的事件各不相同;

我正在考虑应用诸如“GroupByKey”和“Partition”之类的转换,但是似乎我必须知道开发时确定分区所需的事件的数量(和类型)。

你们有什么好主意来戏剧性地进行分区吗?意味着分区可以在运行时确定?

共有1个答案

柴茂材
2023-03-14

如果输入格式简单,则无需使用dataflow即可完成,而且可能会更节省成本。

 类似资料:
  • 当我运行Dataflow作业时,它会将我的小程序包(setup.py或requirements.txt)上传到Dataflow实例上运行。 但是数据流实例上实际运行的是什么?我最近收到了一个stacktrace: 但从理论上讲,如果我在做,这意味着我可能没有运行这个Python补丁?你能指出这些作业正在运行的docker图像吗,这样我就可以知道我使用的是哪一版本的Python,并确保我没有在这里找

  • 我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。

  • 我需要使用信任存储在谷歌云数据流中建立SSLKafka连接。我可以从存储桶提供它,还是有没有办法将其存储在“本地文件系统”上?

  • 我正在运行数据流作业从气流。我需要说我是气流的新手。数据流(从气流运行)正在成功运行,但我可以看到气流在获得工作状态时遇到了一些问题,我收到了无限的消息,比如: 谷歌云数据流作业尚不可用。。 以下是将所有步骤添加到数据流后的日志(我将{project ectID}和{jobID}放在它所在的位置): 你知道这是什么原因吗?我找不到与此问题相关的任何解决方案。我应该提供更多信息吗? 这是我在DAG中

  • 我正在评估Kafka/Spark/HDFS开发NRT(sub-sec)java应用程序的能力,该应用程序从外部网关接收数据,并将其发布到桌面/移动客户端(消费者),用于各种主题。同时,数据将通过流式处理和批处理(持久性)管道传输,用于分析和ML。 例如,流将是。。。 独立的TCP客户端从外部TCP服务器读取流数据 客户端根据数据包(Kafka)发布不同主题的数据,并将其传递给流管道进行分析(Spa

  • 我已经使用Google云数据流SDK编写了一个流式管道,但我想在本地测试我的管道。我的管道从Google Pub/Sub获取输入数据。 是否可以使用DirectPipelineRunner(本地执行,而不是在Google云中)运行访问发布/订阅(pubsubIO)的作业? 我在以普通用户帐户登录时遇到权限问题。我是项目的所有者,我正在尝试访问发布/子主题。