当前位置: 首页 > 面试题库 >

如何在python中使用pyarrow从S3读取分区实木复合地板文件

翟修明
2023-03-14
问题内容

我正在寻找使用python从s3的多个分区目录中读取数据的方法。

data_folder / serial_number = 1 / cur_date = 20-12-2012 /
abcdsd0324324.snappy.parquet data_folder / serial_number = 2 / cur_date =
27-12-2012 / asdsdfsd0324324.snappy.parquet

pyarrow的ParquetDataset模块具有读取分区的功能。所以我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

它引发了以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

根据pyarrow的文档,我尝试使用s3fs作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

抛出以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

我仅限使用ECS集群,因此 spark / pyspark不是一个选择

有没有一种方法可以让我们轻松地从s3中的此类分区目录中轻松读取python中的实木复合地板文件?我觉得列出所有目录然后阅读并不是本链接中建议的一种好习惯。我需要将读取的数据转换为熊猫数据框以进行进一步处理,因此需要与fastparquet或pyarrow相关的选项。我也对python中的其他选项持开放态度。


问题答案:

我设法使它与最新版本的fastparquet&s3fs一起使用。下面是相同的代码:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

感谢马丁通过我们的对话为我指明了正确的方向

注意 :根据基准,这比使用pyarrow慢。一旦通过ARROW-1213在pyarrow中实现了s3fs支持,我将更新我的答案

我使用pyarrow对单个迭代进行了快速基准测试,并将文件列表作为全局文件发送到fastparquet。使用s3fs和pyarrow
+我的hackish代码,fastparquet更快。但是我认为pyarrow + s3fs一旦实施便会更快。

代码和基准如下:

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

更新2019

在完成所有PR之后,诸如Arrow-2038和Fast
Parquet-PR#182之类的问题已解决。

使用Pyarrow读取镶木地板文件

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas()

使用快速拼花板读取拼花板文件

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

快速基准

这可能不是基准测试的最佳方法。请阅读博客文章以获得通过基准测试

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

有关Pyarrow速度的更多阅读

参考:

  • 快速镶木地板
  • s3fs
  • 罂粟
  • 基于讨论和文档的pyarrow箭头代码
  • 基于讨论PR-182,PR-182和文档的fastparquet代码


 类似资料:
  • 问题内容: 我有一种使用(1.4.4),(0.4.1)和(0.20.3)实现此目的的方法。 首先,我可以像这样在本地读取单个实木复合地板文件: 我还可以像这样在本地读取实木复合地板文件的目录: 两者都像魅力。现在,我想使用存储在S3存储桶中的文件远程实现相同的目的。我希望这样的事情行得通: 但这不是: 在彻底阅读了pyarrow的文档之后,目前看来这是不可能的。因此,我提出了以下解决方案: 从S3

  • 问题内容: 我需要从AWS S3读取实木复合地板数据。如果我为此使用aws sdk,则可以得到如下输入流: 但是apache木地板阅读器仅使用本地文件,如下所示: 所以我不知道如何解析实木复合地板文件的输入流。例如,对于csv文件,存在使用inputstream的CSVParser。 我知道使用Spark实现此目标的解决方案。像这样: 但是我不能使用火花。 谁能告诉我从s3读取镶木地板数据的任何解

  • 我正在尝试使用火花在S3文件中生成拼花文件,目的是稍后可以使用presto从拼花中查询。基本上,它看起来是这样的, <代码>Kafka- 我能够使用Spark在S3中生成拼花并且工作正常。现在,我正在查看presto,我想我发现它需要hive meta store才能从拼花中查询。即使拼花保存模式,我也无法让presto读取我的拼花文件。那么,这是否意味着在创建拼花文件时,火花作业还必须将元数据存

  • 问题内容: Spark解释实木复合地板的列的方式存在一些问题。 我有一个具有已确认架构的Oracle源(df.schema()方法): 然后将其另存为Parquet-df.write()。parquet()方法-具有相应的消息类型(由Spark确定): 然后,我的应用程序使用HashMap生成表DDL以进行类型转换,例如: 我的问题是Impala无法读取该表,因为它不接受LM_PERSON_ID作

  • 我将一个HTML文件加载到S3存储桶中。此文件旨在用作电子邮件正文的模板。 我使用了这个问题的一个答案(使用节点fs从aws s3 bucket读取文件)来帮助我从bucket中取出文件。现在看来,我有缓冲区数据,我需要转换回HTML,或者字符串版本也可以。到目前为止,我尝试过的事情都没有成功。。。 下面是我获取模板的代码: 当我记录“模板”返回的数据时,它看起来是这样的。。。我需要模板之类的东西

  • 当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?