我有一个使用数据流流道运行的光束管道。它接受XML并输出JSON,然后将其存储在BigQuery表中。早些时候,我正在使用束管道将换行符分隔的JSON写入GCS桶,并从文件中创建BQ表,而不对其进行任何更改(使用bigQuery控制台)。作业成功运行,数据被导入到BQ中,没有任何问题。
现在我修改了管道,以便将输出JSON行直接写入BQ表。我使用的是阿帕奇光束。木卫一。WriteToBigQuery函数。Pcollections是json对象,其中每行包含一个用于BQ的对象(行)。
下面是进入WriteToBigQuery的示例输入:
{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}
我的部分代码如下:
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project_name'
google_cloud_options.job_name = 'jobid'
google_cloud_options.staging_location = 'gs://bucket/staging'
google_cloud_options.temp_location = 'gs://bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
table_spec = 'project:dataset.table'
data = (p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
#| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
#| beam.Map(print_row)
)
project_id = "project1"
dataset_id = 'dataset'
table_id = 'table'
table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')
data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
p.run()
运行此管道时的错误如下:
AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']
我认为错误是由于上一步的返回类型,或者与执行straming和批量加载到BigQuery相关的内容。我想在mycase中进行批量加载。我曾尝试使用ApacheBeam文档中给出的示例insert管道来编写管道工作的bigquery表。数据格式如下:
quotes = p | beam.Create([
{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])
我如何修改我的管道,使字符串类型的数据在我的情况下被写入bigQuery表。
只要把它贴在这里,如果有人遇到同样的问题。这是我忽略的一个非常微小的细节。梁木卫一。WriteToBigquery()将字典作为输入。接收器部分之前的pcollection返回单个元素或字符串的列表(取决于我尝试的某些版本)。我刚刚在管道中添加了另一个步骤,使用json将json字符串转换为python字典。成功将行加载到BQ的加载。
问题内容: 我在python 3.3.4中遇到“解码”方法的问题。这是我的代码: 但是我无法解码此问题的代码: 你有什么想法?谢谢 问题答案: 一种 编码 字符串,另一种 解码 字节。 您应该从文件中读取字节并对其进行解码: 幸运的是,有一个编码参数使操作变得简单:
问题内容: 为什么被认为是物体?返回列表中的第一项,但我不能追加到列表中的第一项。谢谢。 Edit01: @pyfunc:谢谢您的解释;现在我明白了。 我需要一个清单清单。因此“来自表单”应为列表。我做到了(如果这不是正确的方法,请更正): 问题答案: myList [1]是myList的元素,其类型是字符串。 myList [1]是str,您不能附加它。myList是一个列表,您应该已经附加了它
问题内容: 有点困惑,因为我很肯定我以前曾经做过这项工作。 我创建了以下方法… 但是当我在导入的CSV文件上运行它时,会产生此错误: 尽管我可以看到这是文档中的一个属性: https://pandas.pydata.org/pandas- docs/stable/generation/pandas.Series.str.isnumeric.html?highlight=isnumeric#pand
问题内容: 我正在尝试使用SQLAlchemy + Python将一个项目添加到我的数据库中,但始终出现错误。 我的database_setup.py: 在将sqlalchemy导入到终端后,我定义了一个要插入的项目: 并绘制一个会话以添加和提交: 当我提交时,我不断收到此错误: 我在我的公司表中添加了一个“ Jawbone”对象,我理解我的“ JawboneUP3”应该与之相关。该对象是通过我通
问题内容: 我正在尝试过滤出包含产品列表的数据框。但是,我遇到了熊猫-每当我运行代码时,“ dataframe”对象都没有属性“ str”错误。 这是代码行: 如果有人有任何建议的想法,请告诉我。我已经搜索了很多次,而且非常困惑。 产品是对象数据类型。 编辑: 这是头: 编辑2:这是print(data),A是产品。当我将其打印出来时,看起来好像A不在类别产品下。 问题答案: 答案很简单: 改变成