当前位置: 首页 > 面试题库 >

如何将CSV转换为Apache Beam数据流中的字典

融修平
2023-03-14
问题内容

我想读取一个csv文件,并使用apache
beam数据流将其写入BigQuery。为此,我需要以字典的形式将数据呈现给BigQuery。我该如何使用apache beam转换数据来做到这一点?

我的输入csv文件有两列,我想在BigQuery中创建随后的两列表。我知道如何在BigQuery中创建数据,这很简单,我不知道如何将csv转换为字典。下面的代码是不正确的,但应该给出我要做什么的想法。

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
   beam.io.BigQuerySink(
   output_table,
   schema='month:INTEGER, tornado_count:INTEGER',
   create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
   write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

问题答案:

编辑:从2.12.0版开始,Beam附带了新的fileio转换,使您可以从CSV读取数据而不必重新实现源代码。您可以这样做:

def get_csv_reader(readable_file):
  # You can return whichever kind of reader you want here
  # a DictReader, or a normal csv.reader.
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with Pipeline(...) as p:
  content_pc = (p
                | beam.io.fileio.MatchFiles("/my/file/name")
                | beam.io.fileio.ReadMatches()
                | beam.Reshuffle()  # Useful if you expect many matches
                | beam.FlatMap(get_csv_reader))

我最近为Apache
Beam编写了一个测试。您可以查看Github存储库。

旧的答案 依赖于重新实现源。这已不再是推荐的主要方法:)

想法是有一个返回已解析的CSV行的源。您可以通过对FileBasedSource类进行子类化以包括CSV解析来实现。特别是,该read_records函数将如下所示:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)

    reader = csv.reader(self._file)

    for rec in reader:
      yield rec


 类似资料:
  • 问题内容: //或将多部分文件保存到数据库的任何其他解决方案。我尝试用这种方式,但出现错误。 问题答案:

  • 我正在尝试创建一个简单的解析util,它转换一个两列CSV文件并将其放入一个映射。 如您所见,我正在创建一个字符串流,用逗号分隔每一行,并将其转换为字符串数组,最后将键映射到索引0,将值映射到索引1。 出于某种原因,当我运行这个测试时,实际值为null。我排除了无效的文件路径,因为它在另一个单元测试中运行良好,并且键值出现在CSV中。我已经盯着它看了几个小时了,我想也许有人能指出我的错误。 此外,

  • 我的代码 这是当前折线图的样子:

  • 问题内容: Python中如何将输入数据转换为数字? 问题答案: Python 2.x 有两个函数用于获取用户输入,分别称为和。它们之间的区别是,不评估数据并以字符串形式原样返回。但是,将对你输入的内容进行评估,评估结果将返回。例如, 5 + 17评估数据,结果为22。当它对表达式求值时5 + 17,它将检测到你要添加两个数字,因此结果也将是同一int类型。因此,类型转换是免费完成的,并22作为的

  • 问题内容: 如何将数组转换为CSV文件? 这是我的数组: 问题答案: 我正在使用以下功能;它是对fputscsv注释中的man条目之一的改编。而且您可能想要展平该数组;不知道如果您传递一个多维的行会发生什么。

  • 问题内容: 我有一个要转换为CSV文件的JSON文件。如何使用Python执行此操作? 我试过了: 但是,它没有用。我正在使用Django,收到的错误是: 然后,我尝试了以下方法: 然后我得到错误: 样本json文件: 问题答案: 首先,你的JSON具有嵌套对象,因此通常无法直接将其转换为CSV。你需要将其更改为以下内容: 这是从中生成CSV的代码: 你将获得以下输出: