我想读取一个csv文件,并使用apache
beam数据流将其写入BigQuery。为此,我需要以字典的形式将数据呈现给BigQuery。我该如何使用apache beam转换数据来做到这一点?
我的输入csv文件有两列,我想在BigQuery中创建随后的两列表。我知道如何在BigQuery中创建数据,这很简单,我不知道如何将csv转换为字典。下面的代码是不正确的,但应该给出我要做什么的想法。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
编辑:从2.12.0版开始,Beam附带了新的fileio
转换,使您可以从CSV读取数据而不必重新实现源代码。您可以这样做:
def get_csv_reader(readable_file):
# You can return whichever kind of reader you want here
# a DictReader, or a normal csv.reader.
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with Pipeline(...) as p:
content_pc = (p
| beam.io.fileio.MatchFiles("/my/file/name")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle() # Useful if you expect many matches
| beam.FlatMap(get_csv_reader))
我最近为Apache
Beam编写了一个测试。您可以查看Github存储库。
旧的答案 依赖于重新实现源。这已不再是推荐的主要方法:)
想法是有一个返回已解析的CSV行的源。您可以通过对FileBasedSource
类进行子类化以包括CSV解析来实现。特别是,该read_records
函数将如下所示:
class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for rec in reader:
yield rec
问题内容: //或将多部分文件保存到数据库的任何其他解决方案。我尝试用这种方式,但出现错误。 问题答案:
我正在尝试创建一个简单的解析util,它转换一个两列CSV文件并将其放入一个映射。 如您所见,我正在创建一个字符串流,用逗号分隔每一行,并将其转换为字符串数组,最后将键映射到索引0,将值映射到索引1。 出于某种原因,当我运行这个测试时,实际值为null。我排除了无效的文件路径,因为它在另一个单元测试中运行良好,并且键值出现在CSV中。我已经盯着它看了几个小时了,我想也许有人能指出我的错误。 此外,
我的代码 这是当前折线图的样子:
问题内容: Python中如何将输入数据转换为数字? 问题答案: Python 2.x 有两个函数用于获取用户输入,分别称为和。它们之间的区别是,不评估数据并以字符串形式原样返回。但是,将对你输入的内容进行评估,评估结果将返回。例如, 5 + 17评估数据,结果为22。当它对表达式求值时5 + 17,它将检测到你要添加两个数字,因此结果也将是同一int类型。因此,类型转换是免费完成的,并22作为的
问题内容: 如何将数组转换为CSV文件? 这是我的数组: 问题答案: 我正在使用以下功能;它是对fputscsv注释中的man条目之一的改编。而且您可能想要展平该数组;不知道如果您传递一个多维的行会发生什么。
问题内容: 我有一个要转换为CSV文件的JSON文件。如何使用Python执行此操作? 我试过了: 但是,它没有用。我正在使用Django,收到的错误是: 然后,我尝试了以下方法: 然后我得到错误: 样本json文件: 问题答案: 首先,你的JSON具有嵌套对象,因此通常无法直接将其转换为CSV。你需要将其更改为以下内容: 这是从中生成CSV的代码: 你将获得以下输出: