当前位置: 首页 > 知识库问答 >
问题:

Apache Beam作为参数写入BigQuery表和模式

吕征
2023-03-14

我正在为Apache Beam使用Python SDK。datatable和架构的值在pCollection中。这是我从PubSub上读到的消息:

{"DEVICE":"rms005_m1","DATESTAMP":"2020-05-29 20:54:26.733 UTC","SINUMERIK__x_position":69.54199981689453,"SINUMERIK__y_position":104.31400299072266,"SINUMERIK__z_position":139.0850067138672}

然后,我想使用json消息中的值将其写入BigQuery,其中lambda函数用于datatable,此函数用于模式:

def set_schema(data):
    list = []
    for name in data:
        if name == 'STATUS' or name == 'DEVICE':
            type = 'STRING'
        elif name == 'DATESTAMP':
            type = 'TIMESTAMP'
        else:
            type = 'FLOAT'
        list.append(name + ':' + type)
    schema = ",".join(list)
    return schema

data = (p
        | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=topic)
        | "Parse json" >> beam.Map(json_parse)
        | "Write to BQ" >> beam.io.WriteToBigQuery(
            table='project:dataset{datatable}__opdata'.format(datatable = lambda element: element["DEVICE"]),
            schema=set_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
       )

当我执行它时,会出现以下错误:

ValueError: Expected a table reference (PROJECT:DATASET.TABLE or DATASET.TABLE) instead of project:dataset.<function <lambda> at 0x7fa0dc378710>__opdata

如何将PCollection的值用作pTransform中的变量?

共有1个答案

袁轶
2023-03-14

您必须将函数传递到表中。请尝试以下操作:

| "Write to BQ" >> beam.io.WriteToBigQuery(
            table=lambda element: 'project:dataset{datatable}__opdata'.format(datatable = element["DEVICE"]),
            schema=set_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
 类似资料:
  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 我希望从ParDo函数中调用操作,为中的每个键生成单独的BigQuery表(我使用的是python SDK)。这里有两个类似的线程,不幸的是没有帮助: 1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey 当我执行以下代码时,第一个键的行被插入到BigQuery,然后管道失败,出现以下错误

  • 我有一个数据流作业要写入BigQuery。它对于非嵌套模式很好,但是对于嵌套模式却失败了。 下面是我的数据流管道: 我使用以下模式创建了BigQuery表: 我得到以下错误: 有人能给我指路吗?我做错了什么?此外,如果有更好的方法迭代所有嵌套模式并写入BigQuery,请建议? 其他信息我的数据文件:

  • 问题内容: 使用JAX-RS和(java8)时出现问题。 我想使用JSON将这样的对象传递到JAX-RS方法中: 我得到的异常是: :没有合适的构造找到型[简单的类型,类在[来源不能实例从JSON对象(需要添加/启用类型信息)::] ; 行:2,列:3] 如何创建某种将json-dates映射到的拦截器?我尝试实现a ,但是如果is是另一个类中的 字段 ,则我必须为每个持有a的类编写一个(据我所知

  • 使用JAX-RS和(java8)时出现问题。 我想使用JSON将这样一个对象传递到JAX-RS方法中: 我得到的例外是:

  • 我想创建一个表,然后使用云函数写入bigquery,但是我不想复制表中的数据,所以我先删除表,然后在每次调用函数时创建表。 所以错误是当我首先删除表时,当它被重新创建以写入时,插入所有无法找到表我得到了这个错误:表abc.abc_names找不到