当前位置: 首页 > 知识库问答 >
问题:

云数据流写入 BigQuery Python 错误

彭弘伟
2023-03-14

我正在编写一个简单的 Beam 作业,将数据从 GCS 存储桶复制到 BigQuery。代码如下所示:

from apache_beam.options.pipeline_options import GoogleCloudOptions
import apache_beam as beam

pipeline_options = GoogleCloudOptions(flags=sys.argv[1:])
pipeline_options.project = PROJECT_ID
pipeline_options.region = 'us-west1'
pipeline_options.job_name = JOB_NAME
pipeline_options.staging_location = BUCKET + '/binaries'
pipeline_options.temp_location = BUCKET + '/temp'

schema = 'id:INTEGER,region:STRING,population:INTEGER,sex:STRING,age:INTEGER,education:STRING,income:FLOAT,statusquo:FLOAT,vote:STRING'
p = (beam.Pipeline(options = pipeline_options)
     | 'ReadFromGCS' >> beam.io.textio.ReadFromText('Chile.csv')
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('project:tmp.dummy', schema = schema))

我们在项目项目中写入tmp.dmmy表的位置。这将导致以下堆栈跟踪:

Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 151, in _run_module_as_main
    mod_name, loader, code, fname = _get_module_details(mod_name)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 101, in _get_module_details
    loader = get_loader(mod_name)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 464, in get_loader
    return find_loader(fullname)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 474, in find_loader
    for importer in iter_importers(fullname):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pkgutil.py", line 430, in iter_importers
    __import__(pkg)
  File "WriteToBigQuery.py", line 49, in <module>
    | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(str(PROJECT_ID + ':' + pipeline_options.write_file), schema = schema))
  File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1337, in __init__
    self.table_reference = _parse_table_reference(table, dataset, project)
  File "/Users/mayansalama/Documents/GCP/gcloud_env/lib/python2.7/site-packages/apache_beam/io/gcp/bigquery.py", line 309, in _parse_table_reference
    if isinstance(table, bigquery.TableReference):
AttributeError: 'module' object has no attribute 'TableReference'

看起来有些进口商品在某个地方出了问题;这是否可能是使用GoogleCloudOptions管道选项导致的?

共有2个答案

姚伟
2023-03-14

我做了一些测试,无法重现您的问题,数据集是否已经存在?。以下片段对我有用(我使用答案以更好地格式化):

import apache_beam as beam
import sys

PROJECT='PROJECT_ID'
BUCKET='BUCKET_NAME'
schema = 'id:INTEGER,region:STRING'

class Split(beam.DoFn):

    def process(self, element):
        id, region = element.split(",")

        return [{
            'id': int(id),
            'region': region,
        }]

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)

   (p
      | 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/staging/dummy.csv'.format(BUCKET))
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:test.dummy'.format(PROJECT), schema=schema)
   )

   p.run()

if __name__ == '__main__':
   run()

其中dumm.csv包含:

$ cat dummy.csv 
1,us-central1 
2,europe-west1 

BigQuery中的输出为:

使用的一些相关依赖项:

apache-beam==2.4.0
google-cloud-bigquery==0.25.0
google-cloud-dataflow==2.4.0
米楷
2023-03-14

我也犯了同样的错误。我意识到我安装了错误的apache beam包。安装apachebeam时,需要在包名中添加[gcp]。

sudo pip install apache_beam[gcp]

还有一些可选的安装来修复安装错误,你很好。

sudo pip install oauth2client==3.0.0
sudo pip install httplib2==0.9.2
 类似资料:
  • 我试图用Cloud Dataflow(Beam Python SDK)将它读写到BigQuery。 读写2000万条记录(约80 MBs)几乎需要30分钟。 查看dataflow DAG,我可以看到将每个CSV行转换为BQ行花费了大部分时间。

  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 我将多个文本字段绑定到IDE Rapidclipse中的一个实体,该实体使用Vaadin14和JPA/Hibernate。其目的是将按钮点击时的输入数据写入定义为DataSource的引用数据库中。我已经发现这个文档是手动编写的。所以我对此的理解是这样的:首先,我需要创建一个实体的新bean()。那么所有输入数据都应该分配给bean()的属性。文档就这么多了。但是数据是如何插入到数据库中实体的表中

  • 我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流 在日志中,我得到了这个: 2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:0

  • 我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢

  • 没有办法(不管它有多“黑”)检测Java的已写入,以便在发生这种情况时能够执行逻辑?-我目前正在使用(我们称之为)的一个自定义子类,它在