当前位置: 首页 > 知识库问答 >
问题:

使用Google Cloud DataFlow python sdk读取一组xml文件

傅阿苏
2023-03-14

我试图从GCS bucket中读取XML文件集合并对其进行处理,其中集合中的每个元素都是表示整个文件的字符串,但我找不到如何实现这一点的像样示例,也无法从Apache Beam文档中理解它,该文档主要是关于Java版本的。

我当前的管道如下所示:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.Read(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

我收到的错误信息是:

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals)  # execute the script

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 135, in <module>
run()

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 130, in run
p.run().wait_until_finish()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 421, in wait_until_finish
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 398, in await_completion
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 444, in await_completion
six.reraise(t, v, tb)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 341, in call
finish_state)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 366, in attempt_call
side_input_values)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 109, in get_evaluator
input_committed_bundle, side_inputs)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 283, in __init__
self._source.pipeline_options = evaluation_context.pipeline_options
AttributeError: 'str' object has no attribute 'pipeline_options'

非常感谢任何帮助。谢谢托默

(p
 | 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish() 
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

(p
 | 'Read Files' >> beam.Create([m.metadata_list for m in gcs.match([training_files_folder])])
 | 'metadata_list to filepath' >> beam.FlatMap(lambda metadata_list: [metadata.path for metadata in metadata_list])
 | 'string To BigQuery Row' >> beam.Map(lambda filepath:
                                        data_ingestion.parse_method(gcs_reader.get_string_from_filepath(filepath)))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # Appends data to the BigQuery table
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
p.run().wait_until_finish()

代码使用这个帮助器类读取gcs文件:

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

  def get_string_from_filepath(self,filepath):
      with self.gcs.open(filepath) as reader:
          res = reader.read()

      return res

共有1个答案

厍光霁
2023-03-14

ReadFromText在给定路径中逐行读取文件。您需要的是一个文件列表,然后在ParDo中使用GcsFileSystem html" target="_blank">https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/GcsFileSystem.py一次读取一个文件,然后将内容写入BigQuery。

您也可以参考类似主题https://lists.apache.org/thread.html/85da22a845cef8edd942fcc4906a7b47040a4ae8e10aef4ef00be233@%3cuser.beam.apache.org%3e的邮件线程

 类似资料:
  • 导语 XML(ExtensibleMarkup Language,可扩展标记语言),是一种类似于HTML的标记语言,但它的设计目的是用来传输数据,而不是显示数据。XML的标签没有被预定义,用户需要在使用时自行进行定义。XML是W3C(万维网联盟)的推荐标准。相对于数据库表格的二维表示,XML使用的树形结构更能表现出数据的包含关系,作为一种文本文件格式,XML简单明了的特性使得它在信息存储和描述领域

  • 问题内容: 我想读取一个xml文件(如下所示),但是我得到了执行权。您能帮我解决这个问题吗? 这是我想要读取xml文件的代码: } 最后,这是我得到的异常: Env POJO类别: POJO类别: 问题答案: 您需要确保使用或将类与XML文档的根元素相关联(请参阅:http : //blog.bdoughan.com/2012/07/jaxb-and-root- elements.html )。或

  • 问题内容: EMF = Eclipse建模框架 我必须在一个课堂项目中使用EMF。我正在尝试了解如何使用EMF执行以下操作: 读取XML, 将值放入对象。 使用ORM将对象中的值持久保存到数据库中。-完成 使用ORM从数据库获取数据并生成XML。 我需要使用EMF(不知道是什么)和JPA(完成)来完成所有这些操作。 我使用过JAXB,我知道,可以使用JAXB完成,但是(EMF == JAXB)怎么

  • 问题内容: 我需要使用Java读取XML文件。它的内容就像 是否有特殊的阅读器/ JAR,还是应该使用 FileInputStream进行 阅读? 问题答案: 另一个建议:尝试使用Commons消化器。这使您可以使用基于规则的方法非常快速地开发解析代码。有一个教程在这里和图书馆可在这里 我也同意Brian和Alzoid的观点,因为JAXB非常适合快速启动和运行。您可以使用JDK附带的xjc绑定编译

  • 我试图从XML文件中读入一些数据,但遇到了一些问题,我的XML如下所示: 我试图将这些值作为字符串读入Java程序,到目前为止,我已经编写了以下代码: 我正在努力读取和打印id、用户名等的值。

  • 我在三个阶段阅读xml文件,在每个阶段,我对不同的元素感兴趣,基于输入参数。 使用XMLStreamReader多次读取一个xml文件的最佳方法是什么? 其中inputStream是FileInputStream实例 目前,我得到了StreamClosed异常或streamReader。当我开始第二阶段读取时,hasNext()为false。