当前位置: 首页 > 知识库问答 >
问题:

使用pyrow如何附加到拼花文件?

都浩淼
2023-03-14

如何使用pyarrow向拼花地板文件添加/更新?

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


 table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
 table3 = pd.DataFrame({'six': [-1, np.nan, 2.5], 'nine': ['foo', 'bar', 'baz'], 'ten': [True, False, True]})


pq.write_table(table2, './dataNew/pqTest2.parquet')
#append pqTest2 here?  

我在文档中找不到任何关于附加拼花文件的内容。此外,您是否可以将pyarrow与多处理一起使用来插入/更新数据。

共有3个答案

曾英睿
2023-03-14

一般来说,拼花地板数据集由多个文件组成,因此可以通过将其他文件写入数据所属的同一目录来进行追加。能够轻松地连接多个文件将非常有用。我打开了https://issues.apache.org/jira/browse/PARQUET-1154为了使这在C(因此也是Python)中轻松实现

江飞白
2023-03-14

在您的情况下,列名不一致,我使三个示例数据帧的列名一致,以下代码对我有效。

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


def append_to_parquet_table(dataframe, filepath=None, writer=None):
    """Method writes/append dataframes in parquet format.

    This method is used to write pandas DataFrame as pyarrow Table in parquet format. If the methods is invoked
    with writer, it appends dataframe to the already written pyarrow table.

    :param dataframe: pd.DataFrame to be written in parquet format.
    :param filepath: target file location for parquet file.
    :param writer: ParquetWriter object to write pyarrow tables in parquet format.
    :return: ParquetWriter object. This can be passed in the subsequenct method calls to append DataFrame
        in the pyarrow Table
    """
    table = pa.Table.from_pandas(dataframe)
    if writer is None:
        writer = pq.ParquetWriter(filepath, table.schema)
    writer.write_table(table=table)
    return writer


if __name__ == '__main__':

    table1 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table3 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    writer = None
    filepath = '/tmp/verify_pyarrow_append.parquet'
    table_list = [table1, table2, table3]

    for table in table_list:
        writer = append_to_parquet_table(table, filepath, writer)

    if writer:
        writer.close()

    df = pd.read_parquet(filepath)
    print(df)

输出:

   one  three  two
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
令狐新翰
2023-03-14

我遇到了同样的问题,我想我能够使用以下方法解决它:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


chunksize=10000 # this is the number of lines

pqwriter = None
for i, df in enumerate(pd.read_csv('sample.csv', chunksize=chunksize)):
    table = pa.Table.from_pandas(df)
    # for the first chunk of records
    if i == 0:
        # create a parquet write object giving it an output file
        pqwriter = pq.ParquetWriter('sample.parquet', table.schema)            
    pqwriter.write_table(table)

# close the parquet writer
if pqwriter:
    pqwriter.close()
 类似资料:
  • 使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中

  • 我如何一次加载5年的拼花数据并复制到一个表中?因为1个月的负荷比我1.5个小时,5年就要花我90个小时。如果有可能并行加载?我该怎么做呢? 谢谢

  • 我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。

  • 我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?

  • 我正在使用下面的代码片段来保存数据。它只会在同一分区文件夹下创建一个新的拼花地板文件。是否有任何方法可以将数据真正附加到现有的拼花地板文件中。所以,如果一天中有许多附件,我们就不会有多个文件? <代码>测向。聚结(1)。写模式(“追加”)。partitionBy(“paritionKey”)。拼花地板(“…\parquet\u file\u folder\”) 非常感谢你的帮助。

  • 我正在尝试使用SparkStreaming将流数据存储到HDFS中,但它会继续在新文件中创建附加到一个文件或几个多个文件中 如果它一直创建n个文件,我觉得效率不会很高 代码 在我的pom中,我使用了各自的依赖项: 火花-core_2.11 火花-sql_2.11 火花-streaming_2.11 火花流-kafka-0-10_2.11