当前位置: 首页 > 知识库问答 >
问题:

Google数据流作业在writeToBiqquery步骤中失败:“list”对象和“str”对象没有属性“items”

秦新立
2023-03-14

我有一个使用数据流流道运行的光束管道。它接受XML并输出JSON,然后将其存储在BigQuery表中。早些时候,我正在使用束管道将换行符分隔的JSON写入GCS桶,并从文件中创建BQ表,而不对其进行任何更改(使用bigQuery控制台)。作业成功运行,数据被导入到BQ中,没有任何问题。

现在我修改了管道,以便将输出JSON行直接写入BQ表。我使用的是阿帕奇光束。木卫一。WriteToBigQuery函数。Pcollections是json对象,其中每行包含一个用于BQ的对象(行)。

下面是进入WriteToBigQuery的示例输入:

{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}

我的部分代码如下:

from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def run(argv = None):

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)  
    google_cloud_options.project = 'project_name'
    google_cloud_options.job_name = 'jobid'
    google_cloud_options.staging_location = 'gs://bucket/staging'
    google_cloud_options.temp_location = 'gs://bucket/temp'
    options.view_as(StandardOptions).runner = 'DataflowRunner'


    p = beam.Pipeline(options=options)

    table_spec = 'project:dataset.table'

    data = (p
        | 'Create' >> beam.Create([input_file_path])
        | 'GetXML' >> beam.ParDo(ReadGCSfile())
        #| 'Convert2JSON' >> beam.ParDo(converttojson())
        | 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
        #| beam.Map(print_row)
        )


    project_id = "project1"
    dataset_id = 'dataset'
    table_id = 'table'
    table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')

    data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                    )

    p.run()

运行此管道时的错误如下:

AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']

我认为错误是由于上一步的返回类型,或者与执行straming和批量加载到BigQuery相关的内容。我想在mycase中进行批量加载。我曾尝试使用ApacheBeam文档中给出的示例insert管道来编写管道工作的bigquery表。数据格式如下:

quotes = p | beam.Create([
    {'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
    {'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])

我如何修改我的管道,使字符串类型的数据在我的情况下被写入bigQuery表。

共有1个答案

堵琨
2023-03-14

只要把它贴在这里,如果有人遇到同样的问题。这是我忽略的一个非常微小的细节。梁木卫一。WriteToBigquery()将字典作为输入。接收器部分之前的pcollection返回单个元素或字符串的列表(取决于我尝试的某些版本)。我刚刚在管道中添加了另一个步骤,使用json将json字符串转换为python字典。成功将行加载到BQ的加载。

 类似资料:
  • 问题内容: 我在python 3.3.4中遇到“解码”方法的问题。这是我的代码: 但是我无法解码此问题的代码: 你有什么想法?谢谢 问题答案: 一种 编码 字符串,另一种 解码 字节。 您应该从文件中读取字节并对其进行解码: 幸运的是,有一个编码参数使操作变得简单:

  • 问题内容: 为什么被认为是物体?返回列表中的第一项,但我不能追加到列表中的第一项。谢谢。 Edit01: @pyfunc:谢谢您的解释;现在我明白了。 我需要一个清单清单。因此“来自表单”应为列表。我做到了(如果这不是正确的方法,请更正): 问题答案: myList [1]是myList的元素,其类型是字符串。 myList [1]是str,您不能附加它。myList是一个列表,您应该已经附加了它

  • 问题内容: 有点困惑,因为我很肯定我以前曾经做过这项工作。 我创建了以下方法… 但是当我在导入的CSV文件上运行它时,会产生此错误: 尽管我可以看到这是文档中的一个属性: https://pandas.pydata.org/pandas- docs/stable/generation/pandas.Series.str.isnumeric.html?highlight=isnumeric#pand

  • 问题内容: 我正在尝试使用SQLAlchemy + Python将一个项目添加到我的数据库中,但始终出现错误。 我的database_setup.py: 在将sqlalchemy导入到终端后,我定义了一个要插入的项目: 并绘制一个会话以添加和提交: 当我提交时,我不断收到此错误: 我在我的公司表中添加了一个“ Jawbone”对象,我理解我的“ JawboneUP3”应该与之相关。该对象是通过我通

  • 问题内容: 我正在尝试过滤出包含产品列表的数据框。但是,我遇到了熊猫-每当我运行代码时,“ dataframe”对象都没有属性“ str”错误。 这是代码行: 如果有人有任何建议的想法,请告诉我。我已经搜索了很多次,而且非常困惑。 产品是对象数据类型。 编辑: 这是头: 编辑2:这是print(data),A是产品。当我将其打印出来时,看起来好像A不在类别产品下。 问题答案: 答案很简单: 改变成