当前位置: 首页 > 知识库问答 >
问题:

如何在同一管道中使用ApacheBeamPython作业从BigQuery和文件系统读取数据?

祁嘉瑞
2023-03-14

我正在尝试使用下面的代码从Bigquery读取一些数据,并从文件系统读取一些数据。

apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) | beam.combiners.ToList()
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())

但是,当我运行这个管道时,我得到了以下错误

Traceback(最近一次调用):File"/etl/dataflow/etlTXLPreprocessor.py",第125行,在run()File"/etl/dataflow/etlTXLPreprocessor.py",第120行,在runp.run()中。wait_until_finish()File"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",第461行,在运行自._options). run(False)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/pipeline.py",第474行,在运行中返回self.runner.run_pipeline._options)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/runner/Direct/direct_runner.py",第182行,run_pipeline返回runner.run_pipeline(管道,选项)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/runner/Direct/direct_runner.py",第413行,run_pipelinepipeline.replace_all(_get_transform_overrides(选项))File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/pipeline.py",第443行,replace_all。_replace(重写)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/pipeline.py",第340行,_replaceself.visit(TransformUpdater(自))File"/etl/dataflow/venv3/lib/python3.7/site-pack/apache_beam/pipeline.py",第503行,访问自._root_transform(). access(访问者,自,访问过)File"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py",第939行,在访问part.visit(访问者,管道,访问过的)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/pipeline.py",第939行,在访问part.visit(访问者,管道,访问过)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/pipeline.py",第939行,在访问part.visit(访问者,管道,访问过)File"/etl/dataflow/venv3/lib/python3.7/site-包/apache_beam/管道. py",第942行,在访问访问者中。visit_transform(自己)文件"/etl/dataflow/venv3/lib/python3.7/site-Packers/apache_beam/管道. py",第338行,visit_transform自己。_replace_if_needed(transform_node)文件"/etl/dataflow/venv3/lib/python3.7/site-pack/apache_beam/管道. py",第301行,_replace_if_needednew_output=replacement_transform。展开(input_node)文件"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py",第87行,在展开调用者=DoFnInvoker中。create_invoker(签名,process_invocation=False)文件"apache_beam/runner/通用. py",第360行,在apache_beam. runners.通用中。create_invokerTypeError:create_invoker()至少需要2个位置参数(1个给定)

但是如果我这样运行我的代码

apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) | beam.combiners.ToList()
apn1 = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) | beam.combiners.ToList()

还是像这样

preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
preprocess_rows1 = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())

我无法找出错误。在ApacheBeam管道中从同一数据源读取是否存在限制?

共有2个答案

田嘉澍
2023-03-14

这是Apache Beam v2中direct runner中的一个错误。19.修复已完成,但尚未发布。将apachebeam降级到2.16(pip install apachebeam==2.16),它就可以工作了。

芮立果
2023-03-14

当执行相同类型的操作时,我得到了相同的错误,从BigQuery和文件系统中提取数据。

lines = p | "Read Input Parameters" >> ReadFromText(options.input)
past_posts = p | "Get Past Posts From BigQuery" >> Read(BigQuerySource(query=f"SELECT url FROM {full_bq_table_id}", use_standard_sql=False))

错误:

回溯(最后一次调用):文件“/usr/local/cillar/python/3.7.4/Frameworks/python.framework/Versions/3.7/lib/python3.7/runpy.py”,第193行,在“运行”模块作为“主”模块,mod\u spec)文件“/usr/local/cillar/python/3.7.4/Frameworks/python.Frameworks/Versions/3.7/lib/python3.7/runpy.py.py”,第85行,在“运行”代码执行(代码,运行全局)文件中“/Users/ianmitchell/Documents/Personal Projects/Craigslist/Craigslist_pipeline。py”,第14行,在完整的“公寓数据项目:{dataset}”中。craigslist_发布“文件”/Users/ianmitchell/Documents/Personal Projects/craigslist/pipeline/init。py“,第35行,在run result=p.run()文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3中。7/现场包/阿帕奇梁/管道。py“,第461行,运行中的self._options)。运行(False)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py”,第474行,运行中返回self.runner.run_管道(self,self._选项)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/站点包/apache_梁/runner/direct/direct_runner。py“,第182行,在run_pipeline return runner.run_pipeline(pipeline,options)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3中。7/站点包/apache_梁/runner/direct/direct_runner。py“,第413行,在运行管道中。替换所有(_get_transform_overrides(options))文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py“,第443行,在replace_all self._replace(override)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3中。7/现场包/阿帕奇梁/管道。py“,第340行,在_replaceself.visit(TransformUpdater(self))文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3中。7/现场包/阿帕奇梁/管道。py“,第503行,在访问self.\u root_transform().visit(visitor,self,visted)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3中。7/现场包/阿帕奇梁/管道。py”,第939行,访问部分。访问(访问者,管道,访问)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py”,第939行,访问部分。访问(访问者,管道,访问)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py”,第939行,访问部分。访问(访问者,管道,访问)[前一行重复1次]文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py”,第942行,访问访问者。访问转换(自我)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py”,第338行,访问转换自我。如果需要,替换(转换节点)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/现场包/阿帕奇梁/管道。py“,第301行,如果需要新的替换输出=替换转换。展开(输入节点)文件“/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3。7/站点包/apache_beam/runners/direct/sdf_direct_runner。py”,第87行,在expand invoker=DoFnInvoker.create_invoker(签名,process_invoke=False)文件“apache_beam/runners/common”中。py“,第360行,在apache_beam.runners.common.DoFnInvoker.create_invoker TypeError:create_invoker()至少接受2个位置参数(给定1个)

想知道为什么你不能从不同的来源也拉。

 类似资料:
  • 我使用expo下载了一张图片(a.jpg),代码如下: 文件成功保存在文件系统中。后来当我试图读取文件时,我得到一个错误,文件无法读取。用于读取文件的代码: 上面的代码返回文件无法读取的错误。fileInfo.exists是true,因为文件存在于文件系统中。 读取文件时出错: 如果我尝试读取一个文本文件(a.json),而不是jpg(a.jpg),那么一切都很好。所以,文件系统。readAsSt

  • 我有一个python中的ApacheBeam管道,不管出于什么原因,它都有下面这样的流。 SQL作业-- 当我在本地运行此程序时,此序列工作正常。然而,当我试图将其作为数据流管道运行时,它实际上并没有按此顺序运行。 在数据流上运行时是否有强制依赖关系的方法?

  • 我有一个Jenkins的管道作业,它使用中的秘密值: 秘密值作为存储在凭据管理器中,ID为。我通过引用凭据管理器中的秘密文本来参数化我的Jenkins管道作业: 我试图在Jenkinsfile中读取此值,如上所示,但得到以下输出: 它只打印凭据的ID,而不是值。我认为这可能是一种Jenkins的安全机制,但是当我尝试对Freestyle作业做类似的事情时,我得到了一个屏蔽输出()。 但是,如果我在

  • 我想通过JobDSL生成基于管道插件的作业,该作业包含在Jenkins签出的git存储库中。 然而,我认为在作业DSL脚本中使用引用字符串的管道脚本不是很好。因此,我想将它们读入一个字符串,并将其传递给函数: 我必须把放在哪里才能工作?我试着把它放在我的DSL脚本旁边,也放在我的DSL源代码的文件夹中。但是Jenkins总是抛出一个“未找到文件”。

  • 当试图安排作业时,我们需要来自Azkaban的作业名称。有什么内置属性吗?我们从获取流名称。 我的工作文件是:

  • 问题内容: 我正在尝试编写管道脚本以与Jenkins 2.0一起使用来复制我们现有的构建。这个原始版本使用envInject插件读取Java属性文件,但是我看不到如何从管道Groovy脚本中执行此操作。我已经用Google搜索并找到了以下内容,但是它不起作用(FileNotFoundException): 谢谢! 问题答案: 我只是昨天和今天为此而战。我希望此功能更容易找到。 抓住“ Pipeli