我尝试运行一个数据流管道,使用DirectPipelineRunner从本地计算机(windows)读取数据,并写入Google云存储。作业失败,出现以下指定FileNotFoundException的错误(因此我认为数据流作业无法读取我的位置)。我正在本地计算机上运行作业,以运行我创建的基于GCP的模板。我可以在GCP数据流仪表板中看到它,但由于以下错误而失败。请帮忙。我还尝试了本地机器的IP或主机名以及本地位置,但遇到了FileNotFoundException?
错误:
java.io.FileNotFoundException: No files matched spec: C:/data/sampleinput.txt
at org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult(FileSystems.java:172)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:158)
at org.apache.beam.sdk.io.FileBasedSource.split(FileBasedSource.java:261)
at com.google.cloud.dataflow.worker.WorkerCustomSources.splitAndValidate(WorkerCustomSources.java:275)
用于运行模板的命令:
gcloud dataflow jobs run jobname --gcs-location gs://<somebucketname of template>/<templatename> --parameters inputFilePattern=C:/data/sampleinput.txt,outputLocation=gs://<bucketname>/output/outputfile,runner=DirectPipelineRunner
代码:
PCollection<String> textData =pipeline.apply("Read Text Data", TextIO.read().from(options.getInputFilePattern()));
textData.apply("Write Text Data",TextIO.write().to(options.getOutputLocation()));
gcloud dataflow jobs run
命令在云数据流上运行作业。这意味着数据流工作人员将尝试查找C:/data/sampleinput。txt
,显然这些工人身上不存在。
您可以通过上传sampleinput来修复此问题。txt
到存储桶,并提供URIgs://
我已经使用Google云数据流SDK编写了一个流式管道,但我想在本地测试我的管道。我的管道从Google Pub/Sub获取输入数据。 是否可以使用DirectPipelineRunner(本地执行,而不是在Google云中)运行访问发布/订阅(pubsubIO)的作业? 我在以普通用户帐户登录时遇到权限问题。我是项目的所有者,我正在尝试访问发布/子主题。
我试图读取一个csv文件目前在谷歌云存储桶到熊猫数据帧。 它显示以下错误消息: 我做错了什么,我无法找到任何不涉及谷歌数据实验室的解决方案?
我需要从压缩的GCS文件中解析json数据,因为文件扩展名是。gz,所以它应该由dataflow正确地重新组织和处理,但是作业日志打印出不可读的字符和未处理的数据。当我处理未压缩的数据时,它工作得很好。我使用以下方法映射/解析JSON: 你知道原因是什么吗? 运行时的配置: 输入文件名示例:file.gz,命令gsutil ls-l gs://bucket/input/file.gz grep c
我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。
我为API调用创建了一个Python函数,因此我不再需要在Power BI中这样做。它创建了5个XML文件,然后将它们合并成一个CSV文件。我希望该功能在谷歌云上运行(如果这不是一个好主意,请纠正我)。 我认为不可能在函数中创建XML文件(也许可以写入bucket),但理想情况下,我希望跳过XML文件的创建,直接创建CSV。 请在下面找到生成XML文件并合并为CSV的代码: 到csv 有什么想法吗
结果如何在工作人员之间分配?是使用查询结果创建一个表,工作人员从中读取页面,还是每个工作人员运行查询并读取不同的页面或。。。怎样