当前位置: 首页 > 知识库问答 >
问题:

Apache Beam:在PCollection中读取管道的PBegin

田向荣
2023-03-14
    null
Pipeline p2 = Pipeline.create();
p2.apply(// READ IN THE PCOLLECTION HERE)
  .apply(TextIO.write().to("/Users/my/local/fp")));
p2.run()

共有1个答案

罗鸿福
2023-03-14

为了将pcollection读入输入,您需要从源读取它。即。一些数据存储在BigQuery、Google云存储等中。您可以使用特定的源转换从这些位置读取这些数据。根据存储数据的位置,您需要使用正确的源并传入相关参数(即GCS路径、BigQuery表)

请看一下apache beam网站上的最小字数示例(github上的完整源代码)。我建议从这段代码开始,并在它上迭代,直到构建出所需的管道。

在本例中,文件是从GCS读取的

p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
 类似资料:
  • 问题内容: 我试图读取就是BEING柏油,流媒体,以标准输入存档,但我不知怎么读 远远 在管道的数据超过焦油发送。 我这样运行我的命令: 源代码是这样的: 对于100MB的涂油文件夹,我将获得1468个4MB的块(即6.15GB)!此外,数组的大小似乎无关紧要:如果将块大小设置为40MB,我仍然会获得约1400个40MB数据块,这根本没有意义。 要使用Go正确读取数据,我需要做些什么吗? 问题答案

  • 我在谷歌云数据流中运行批处理管道。我需要在一个管道中读取另一个管道以前写过的对象。最简单的wa对象是pickle/dill。 编写工作很好,编写了许多文件,每个文件都有一个pickled对象。当我手动下载文件时,我可以打开文件。编写代码: 要么... ...或者... (对象的类位于带有的路径中,但不确定为什么会错过最后一个字符) 谢谢!

  • 我有一个简单的Jenkinsfile,我想在其中从工作区加载一些数据。我正在使用管道插件来利用存储库中的Jenkinsfile。构建被外包给匹配的Jenkins代理。当我尝试使用“readFile”时,我收到以下消息: Java语言io。FileNotFoundException:/path/to/jenkins/workspace/XXXXX/project/data。json(无此类文件或目录

  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我有一个将JSON消息从PubSub(未绑定的PCollection)流到Google云存储的管道。每个文件应该包含多个JSON对象,每行一个。 谢谢

  • 我想运行node。js作为子流程,并向其提供输入。使用C,下面是我的一些示例代码。 我遇到的问题是,尽管子进程的stdout仍然指向终端,但在向子进程stdin输入打印的“helloworld”行后,我什么也看不到。即使我对管道进行了fflush()操作,我也看不到任何输出。但是,如果关闭管道的输入,则“Hello World”将显示在终端上。 子流程似乎只是缓冲-为什么?我希望最终将子流程std