当前位置: 首页 > 知识库问答 >
问题:

从ParDo函数中写入BigQuery

邴英毅
2023-03-14

我希望从ParDo函数中调用beam.io.write(beam.io.bigquerysink(..))操作,为pcollection中的每个键生成单独的BigQuery表(我使用的是python SDK)。这里有两个类似的线程,不幸的是没有帮助:

1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

当我执行以下代码时,第一个键的行被插入到BigQuery,然后管道失败,出现以下错误。我会非常感谢任何关于我做错了什么或如何修复它的建议。

管道代码:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query))

class par_upload(beam.DoFn):

    def process(self, context):
        key, value = context.element

        ### This block causes issues ###
        value | 'write_to_bq' >> beam.io.Write(
                        beam.io.BigQuerySink(
                            'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key
                            schema=schema,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                            )
            )
        ### End block ######
        return [value] 


### Following part works fine ###
filtered = (rows    | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
                    | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
                    | 'group_by_key' >> beam.GroupByKey() 
                    | 'par_upload_to_bigquery' >> beam.ParDo(par_upload())
                    | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing
                )

### This part works fine if I comment out the 'write_to_bq' block above
filtered | 'write_to_bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            'PROJECT-NAME:analytics.another_table',
            schema=schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

错误消息:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table.
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})}
ERROR:root:Error while visiting par_upload_to_bigquery
Traceback (most recent call last):
  File "split_events.py", line 137, in <module>
    run()
  File "split_events.py", line 132, in run
    p.run()
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo
    runner.process(v)
  File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483)
  File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311)
  File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677)
  File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process
    return self.run(self.dofn.process, context, args, kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run
    result = method(context, *args, **kwargs)
  File "split_events.py", line 73, in process
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__
    return _MaterializePValues(cache).visit(result)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit
    return self._pvalue_cache.get_unwindowed_pvalue(node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue
    return [v.value for v in self.get_pvalue(pvalue)]
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue
    value_with_refcount = self._cache[self.key(pvalue)]
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']"
key_pipe = p | 'pipe_' + key >> beam.Create(value)
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..))

管道失败,出现以下错误:

    JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331)
    op.start()
  File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193)
    def start(self):
  File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499)
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5)

共有1个答案

何琨
2023-03-14

在类似的线程中,在ParDo中执行BigQuery写操作的唯一建议是直接使用BigQuery API,或者使用客户机。

您编写的代码将Dataflow ParDo类beam.io.bigQuerySink()放入DoFn函数中。ParDo类希望处理pCollection,就像工作代码示例中的Filtered一样。对于处理value的不起作用的代码,情况并非如此。

我认为最简单的选择是查看gcloud-python BigQuery函数insert_data()并将其放入Pardo中。

 类似资料:
  • 问题内容: 我在AWS中有一个Node 4.3 Lambda函数。我希望能够将文本文件写入S3,并阅读了许多有关如何与S3集成的教程。但是,所有这些都涉及在写入S3之后如何调用Lambda函数。 如何使用节点从Lambda在S3中创建文本文件?这可能吗?亚马逊的文件似乎没有涵盖它。 问题答案: 是的,绝对有可能! 通过选择或更新执行lambda的IAM角色,确保为Lambda函数提供对目标s3存储

  • 函数名称:保存字符串到文件 函数功能: 将字符串 string 内容存入文件 函数方法 str = file.writeString (path,str,encode) 参数 类型 必填 说明 path string 是 需要写入的文件路径 str string 是 需要写入的文件内容 encode string 否 编码格式,不写默认为 utf-8 函数用例 path1 = "/sdcard/L

  • 我想创建一个表,然后使用云函数写入bigquery,但是我不想复制表中的数据,所以我先删除表,然后在每次调用函数时创建表。 所以错误是当我首先删除表时,当它被重新创建以写入时,插入所有无法找到表我得到了这个错误:表abc.abc_names找不到

  • 函数名称:写入剪贴板 函数功能:对系统剪贴板进行写入操作 函数方法 device.writePasteboard(str); 参数 类型 必填 说明 str string 是 写入剪贴板字符串 函数用例 str1 = "测试写入剪贴板" device.writePasteboard(str1) --写入剪贴板后推荐加延时后再读取剪贴板,否则可能会出现读取错误的问题 mSleep(1000) --读

  • 函数名称:写入到文件 函数功能:写入到文件 函数方法 io.write(str) 参数 类型 必填 说明 str string/number 是 需要写入的内容 函数用例 file,msg = io.open("/mnt/sdcard/888.txt","r+") if file then file:write("1") mSleep(2000) dialog(file:r