使用pyarrow你如何附加到镶木地板文件?

Mer*_*lin 18 python pandas parquet pyarrow

如何附加/更新parquet文件pyarrow

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


 table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
 table3 = pd.DataFrame({'six': [-1, np.nan, 2.5], 'nine': ['foo', 'bar', 'baz'], 'ten': [True, False, True]})


pq.write_table(table2, './dataNew/pqTest2.parquet')
#append pqTest2 here?  
Run Code Online (Sandbox Code Playgroud)

我在文档中找不到任何关于附加镶木地板文件的内容.并且,您可以使用pyarrow 多处理来插入/更新数据.

小智 15

我遇到了同样的问题,我想我能够使用以下方法解决它:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


chunksize=10000 # this is the number of lines

pqwriter = None
for i, df in enumerate(pd.read_csv('sample.csv', chunksize=chunksize)):
    table = pa.Table.from_pandas(df)
    # for the first chunk of records
    if i == 0:
        # create a parquet write object giving it an output file
        pqwriter = pq.ParquetWriter('sample.parquet', table.schema)
        pqwriter.write_table(table)
    # subsequent chunks can be written to the same file
    else:
        pqwriter.write_table(table)

# close the parquet writer
if pqwriter:
    pqwriter.close()
Run Code Online (Sandbox Code Playgroud)

  • 当然,这取决于数据,但是根据我的经验,“ chunksize = 10000”太大了。在大多数情况下,块大小值大约一百可以为我更快地工作 (2认同)
  • if 之后的 else 是不必要的,因为在这两种情况下您都在写入表。 (2认同)
  • 此解决方案仅在编写器仍处于打开状态时才有效...更好的方法是将文件放入目录中。pandas/pyarrow 在读取目录时会将两个文件附加到数据框。 (2认同)

Con*_*ngo 10

将 Pandas 数据帧附加到现有 .parquet 文件的演示。

注意:其他答案不能附加到现有的 .parquet 文件。这个可以; 请参阅最后的讨论。

在 Windows 和 Linux 上的 Python v3.9 上进行了测试。

使用 pip 安装 PyArrow:

pip install pyarrow==6.0.1
Run Code Online (Sandbox Code Playgroud)

或者Anaconda / Miniconda

conda install -c conda-forge pyarrow=6.0.1 -y
Run Code Online (Sandbox Code Playgroud)

演示代码:

# Q. Demo?
# A. Demo of appending to an existing .parquet file by memory mapping the original file, appending the new dataframe, then writing the new file out.

import os
import numpy as np
import pandas as pd
import pyarrow as pa  
import pyarrow.parquet as pq  

filepath = "parquet_append.parquet"
Run Code Online (Sandbox Code Playgroud)

方法 1(共 2)

简单的方法:使用 pandas 读取原始 .parquet 文件,追加,写回整个文件。

# Create parquet file.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
df.to_parquet(filepath)  # ... write to file.

# Append to original parquet file.
df = pd.read_parquet(filepath)  # Read original ...
df2 = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
df3 = pd.concat([df, df2])  # ... concatenate together ...
df3.to_parquet(filepath)  # ... overwrite original file.

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f
Run Code Online (Sandbox Code Playgroud)

方法 2(共 2)

更复杂但更快:使用本机 PyArrow 调用,内存映射原始文件,附加新数据帧,写出新文件。

# Write initial file using PyArrow.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
table = pa.Table.from_pandas(df)
pq.write_table(table, where=filepath)

def parquet_append(filepath:Path or str, df: pd.DataFrame) -> None:
    """
    Append to dataframe to existing .parquet file. Reads original .parquet file in, appends new dataframe, writes new .parquet file out.
    :param filepath: Filepath for parquet file.
    :param df: Pandas dataframe to append. Must be same schema as original.
    """
    table_original_file = pq.read_table(source=filepath,  pre_buffer=False, use_threads=True, memory_map=True)  # Use memory map for speed.
    table_to_append = pa.Table.from_pandas(df)
    table_to_append = table_to_append.cast(table_original_file.schema)  # Attempt to cast new schema to existing, e.g. datetime64[ns] to datetime64[us] (may throw otherwise).
    handle = pq.ParquetWriter(filepath, table_original_file.schema)  # Overwrite old file with empty. WARNING: PRODUCTION LEVEL CODE SHOULD BE MORE ATOMIC: WRITE TO A TEMPORARY FILE, DELETE THE OLD, RENAME. THEN FAILURES WILL NOT LOSE DATA.
    handle.write_table(table_original_file)
    handle.write_table(table_to_append)
    handle.close()  # Writes binary footer. Until this occurs, .parquet file is not usable.

# Append to original parquet file.
df = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
parquet_append(filepath, df)

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f
Run Code Online (Sandbox Code Playgroud)

讨论

@Ibraheem Ibraheem 和 @yardstick17 的答案不能用于附加到现有的 .parquet 文件:

  • 限制1:.close()调用后,无法追加文件。一旦写好页脚,一切就都板上钉钉了;
  • 限制 2:.parquet 文件在调用之前无法被任何其他程序读取.close()(由于缺少二进制页脚,它将引发异常)。

综合起来,这些限制意味着它们不能用于附加到现有的 .parquet 文件,它们只能用于以块的形式写入 .parquet 文件。上述技术消除了这些限制,但代价是效率较低,因为必须重写整个文件才能附加到末尾。经过广泛的研究,我认为不可能使用现有的 PyArrow 库(从 v6.0.1 开始)附加到现有的 .parquet 文件。

可以修改此设置以将文件夹中的多个 .parquet 文件合并到单个 .parquet 文件中。

可以执行高效的更新插入:pq.read_table() 对列和行有过滤器,因此如果原始表中的行在加载时被过滤掉,新表中的行将有效地替换旧表。这对于时间序列数据更有用。


Wes*_*ney 9

一般来说,Parquet数据集由多个文件组成,因此您可以通过将其他文件写入数据所属的同一目录来追加.能够轻松连接多个文件会很有用.我打开https://issues.apache.org/jira/browse/PARQUET-1154,以便在C++中轻松实现(因此Python)

  • parquet-tools 命令 `parquet-merge` 不是一个选项吗?- 至少从命令行?(免责声明我还没有尝试过) (2认同)

Ami*_*aha 9

在您的情况下,列名不一致,我使三个示例数据框的列名一致,以下代码对我有用。

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


def append_to_parquet_table(dataframe, filepath=None, writer=None):
    """Method writes/append dataframes in parquet format.

    This method is used to write pandas DataFrame as pyarrow Table in parquet format. If the methods is invoked
    with writer, it appends dataframe to the already written pyarrow table.

    :param dataframe: pd.DataFrame to be written in parquet format.
    :param filepath: target file location for parquet file.
    :param writer: ParquetWriter object to write pyarrow tables in parquet format.
    :return: ParquetWriter object. This can be passed in the subsequenct method calls to append DataFrame
        in the pyarrow Table
    """
    table = pa.Table.from_pandas(dataframe)
    if writer is None:
        writer = pq.ParquetWriter(filepath, table.schema)
    writer.write_table(table=table)
    return writer


if __name__ == '__main__':

    table1 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table3 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    writer = None
    filepath = '/tmp/verify_pyarrow_append.parquet'
    table_list = [table1, table2, table3]

    for table in table_list:
        writer = append_to_parquet_table(table, filepath, writer)

    if writer:
        writer.close()

    df = pd.read_parquet(filepath)
    print(df)
Run Code Online (Sandbox Code Playgroud)

输出:

   one  three  two
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
Run Code Online (Sandbox Code Playgroud)


sha*_*359 6

只要您打开 pyarrow parquet writer,接受的答案就有效。一旦编写器关闭,我们就无法将行组附加到镶木地板文件中。pyarrow 没有任何实现来附加到已经存在的 parquet 文件。

\n

可以使用fastparquet将行组附加到已经存在的 parquet 文件。\n这是通过示例解释这一点的答案。

\n

来自快速镶木地板文档

\n
\n

追加: bool (False) 或 \xe2\x80\x98overwrite\xe2\x80\x99 如果为 False,则从头开始构建数据集;如果为 True,则将新行组添加到现有数据集。在后一种情况下,数据集必须存在,并且架构必须与输入数据匹配。

\n
\n
from fastparquet import write\nwrite(\'output.parquet\', df, append=True)\n
Run Code Online (Sandbox Code Playgroud)\n

更新: \n功能请求也将其包含在 pyarrow 中 - JIRA

\n