标签: pyarrow

PyArrow:增量使用 ParquetWriter,无需将整个数据集保留在内存中(大于内存 parquet 文件)

我正在尝试将一个大的镶木地板文件写入磁盘(大于内存)。我天真地认为我可以聪明地使用 ParquetWriter 和 write_table 增量写入文件,如下所示(POC):

import pyarrow as pa
import pyarrow.parquet as pq
import pickle
import time

arrow_schema = pickle.load(open('schema.pickle', 'rb'))
rows_dataframe = pickle.load(open('rows.pickle', 'rb'))

output_file = 'test.parq'

with pq.ParquetWriter(
                output_file,
                arrow_schema,
                compression='snappy',
                allow_truncated_timestamps=True,
                version='2.0',  # Highest available schema
                data_page_version='2.0',  # Highest available schema
        ) as writer:
            for rows_dataframe in function_that_yields_data()
                writer.write_table(
                    pa.Table.from_pydict(
                            rows_dataframe,
                            arrow_schema
                    )
                )
Run Code Online (Sandbox Code Playgroud)

但即使我生成了块(比如我的例子中的 10000 行)并使用write_table它仍然将整个数据集保留在内存中。

事实证明,ParquetWriter 将整个数据集保留在内存中,同时增量写入磁盘

是否有办法强制 ParquetWriter 不将整个数据集保留在内存中,或者出于充分的原因根本不可能?

python parquet apache-arrow pyarrow

5
推荐指数
1
解决办法
2056
查看次数

Kubernetes 中的 Docker 容器中的内存映射文件与 Linux 中的常规进程中的内存映射文件的工作方式相同吗?

我有进程 A 和进程 B。进程 A 打开一个文件,调用 mmap 并写入它,进程 B 执行相同的操作,但当进程 A 完成写入时读取相同的映射区域。

使用 mmap,进程 B 应该从内存而不是磁盘读取文件,假设进程 A 没有调用 munmap。

如果我想将进程 A 和进程 B 部署到 Kubernetes 中同一 Pod 中的不同容器,内存映射 IO 的工作方式是否应该与初始示例相同?容器 B(进程 B)是否应该像在常规 Linux 桌面中一样从内存中读取文件?

假设两个容器位于同一个 Pod 中,并且从同一个持久卷读取/写入文件。我是否需要考虑特定类型的卷来实现 mmap IO?

如果您好奇的话,我正在使用 Apache Arrow 和 pyarrow 来读取和写入这些文件并实现零复制读取。

docker kubernetes apache-arrow pyarrow

5
推荐指数
1
解决办法
4336
查看次数

从 parquet 文件读取时 pandas 数据类型发生变化?

我对 pandas 和 parquet 文件类型是全新的。我有一个 python 脚本:

  1. 读入 hdfs parquet 文件
  2. 将其转换为 pandas 数据框
  3. 循环遍历特定列并更改一些值
  4. 将数据帧写回 parquet 文件

然后使用 impala-shell 将 parquet 文件导入回 hdfs。

我遇到的问题似乎与步骤 2 有关。我让它在读入数据帧后以及在步骤 3 中进行任何更改之前立即打印出数据帧的内容。它似乎正在更改数据类型和数据某些字段,当将其写回 parquet 文件时会导致问题。例子:

  • 在数据库中显示为NULL的字段在数据帧的打印输出中被替换为字符串“None”(对于字符串列)或字符串“nan”(对于数字列)。
  • 数据库中应为 Int 且值为 0 的字段将更改为“0.00000”,并在数据帧中转换为浮点型。

看来它实际上正在更改这些值,因为当它写入 parquet 文件并将其导入 hdfs 并运行查询时,我收到如下错误:

WARNINGS: File '<path>/test.parquet' has an incompatible Parquet schema for column 
'<database>.<table>.tport'. Column type: INT, Parquet schema:
optional double tport [i:1 d:1 r:0]
Run Code Online (Sandbox Code Playgroud)

我不知道为什么它会改变数据而不是保持原样。如果发生这种情况,我不知道是否需要循环每一列并将所有这些替换回其原始值,或者是否有其他方法告诉它不要管它们。

我一直在使用这个参考页面: http://arrow.apache.org/docs/python/parquet.html

它用

pq.read_table(in_file) 
Run Code Online (Sandbox Code Playgroud)

读取镶木地板文件,然后

df = table2.to_pandas()
Run Code Online (Sandbox Code Playgroud)

转换为我可以循环并更改列的数据框。我不明白为什么它会改变数据,而且我找不到办法防止这种情况发生。我需要用比 …

dataframe python-3.x pandas parquet pyarrow

5
推荐指数
1
解决办法
1万
查看次数

我可以通过索引访问 Parquet 文件而不将整个文件读入内存吗?

我刚刚读到 HDF5 允许您访问数据查找,而无需将整个文件读入内存。

这种寻找行为在没有 Java 的 Parquet 文件中是否可能(非 pyspark 解决方案)?我使用 Parquet 是因为它有强大的 dtype 支持。

import h5py

f = h5py.File('my_file.hdf5', 'w')
dset = f.create_dataset('coords', data=my_ndarray)
f.close()

f = h5py.File('my_file.hdf5', 'r')
dset = f['coords']
my_array = dset[-2:]
Run Code Online (Sandbox Code Playgroud)

https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata

我在这里看到 Parquet 元数据具有num_row_groups: 1 (or more). 但我不确定这如何帮助我获取行 [23, 42, 117, 99293184]。

parquet fastparquet pyarrow

5
推荐指数
1
解决办法
4670
查看次数

无法将 pandas 数据框保存到镶木地板中,并将浮点数列表作为单元格值

我有一个结构如下的数据框:

                                                Coumn1                                             Coumn2
0    (0.00030271668219938874, 0.0002655923890415579...  (0.0016430083196610212, 0.0014970217598602176,...
1    (0.00015607803652528673, 0.0001314736582571640...  (0.0022136708721518517, 0.0014974646037444472,...
2    (0.011317798867821693, 0.011339936405420303, 0...  (0.004868391435593367, 0.004406007472425699, 0...
3    (3.94578673876822e-05, 3.075833956245333e-05, ...  (0.0075020878575742245, 0.0096737677231431, 0....
4    (0.0004926157998852432, 0.0003811710048466921,...  (0.010351942852139473, 0.008231297135353088, 0...
..                                                 ...                                                ...
130  (0.011190211400389671, 0.011337820440530777, 0...  (0.010182800702750683, 0.011351295746862888, 0...
131  (0.006286659277975559, 0.007315031252801418, 0...  (0.02104150503873825, 0.02531484328210354, 0.0...
132  (0.0022791570518165827, 0.0025983047671616077,...  (0.008847278542816639, 0.009222050197422504, 0...
133  (0.0007059817435219884, 0.0009831463685259223,...  (0.0028264704160392284, 0.0029402063228189945,...
134  (0.0018992726691067219, 0.002058899961411953, ...  (0.0019639385864138603, 0.002009353833273053, ...

[135 rows x 2 columns]
Run Code Online (Sandbox Code Playgroud)

其中每个单元格保存一些浮点值的列表/元组:

type(psd_res.data_frame['Column1'][0])
<class 'tuple'>
type(psd_res.data_frame['Column1'][0][0])
<class 'numpy.float64'> …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet pyarrow

5
推荐指数
1
解决办法
7984
查看次数

dask 读取镶木地板并指定架构

是否有 dask 相当于 Spark 在读取镶木地板文件时指定模式的能力?可能使用传递给 pyarrow 的 kwargs 吗?

我的桶中有一堆镶木地板文件,但某些字段的名称略有不一致。我可以在阅读它们后创建一个自定义延迟函数来处理这些情况,但我希望在通过全局打开它们时可以指定模式。也许不是,因为我猜想通过 globing 打开然后会尝试将它们连接起来。由于字段名称不一致,目前此操作失败。

创建镶木地板文件:

import dask.dataframe as dd

df = dd.demo.make_timeseries(
    start="2000-01-01",
    end="2000-01-03",
    dtypes={"id": int, "z": int},
    freq="1h",
    partition_freq="24h",
)

df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
Run Code Online (Sandbox Code Playgroud)

通过 dask 读取它并在读取后指定模式:

df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})
Run Code Online (Sandbox Code Playgroud)

通过 Spark 读取它并指定模式:

from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()

schema = T.StructType(
    [
        T.StructField("id", T.IntegerType()),
        T.StructField("a", T.FloatType()),
        T.StructField("timestamp", T.TimestampType()),
    ]
)

df = spark.read.format("parquet").schema(schema).load("df.parquet")
Run Code Online (Sandbox Code Playgroud)

pandas apache-spark parquet dask pyarrow

5
推荐指数
1
解决办法
2257
查看次数

PyArrow:如何使用新的文件系统界面将文件从本地复制到远程?

有人可以给我一个提示,告诉我如何使用PyArrow 的新文件系统接口(即 upload、copyFromLocal)将文件从本地文件系统复制到 HDFS 文件系统吗?

我已经反复阅读了文档,并尝试了一些方法(使用 copy_file() 和 FS URI),但似乎都不起作用。旧版 HDFS API的使用很简单,但它已被弃用,尽管新 API 似乎不完整。当然,在文件描述符之间移动数据块是一种解决方案,但是为什么copy_file()存在呢?

python hdfs apache-arrow pyarrow

5
推荐指数
1
解决办法
1526
查看次数

如何将 ndarray/多维数组转换为 parquet 文件?

我有一个 <class 'numpy.ndarray'> 数组,我想将其保存到 parquet 文件中以传递给我正在构建的 ML 模型。我的数组有 159573 个数组,每个数组有 1395 个数组。

这是我的数据示例:

[[0.         0.         0.         ... 0.24093714 0.75547471 0.74532781]
 [0.         0.         0.         ... 0.24093714 0.75547471 0.74532781]
 [0.         0.         0.         ... 0.24093714 0.75547471 0.74532781]
 ...
 [0.         0.         0.         ... 0.89473684 0.29282009 0.29277004]
 [0.         0.         0.         ... 0.89473684 0.29282009 0.29277004]
 [0.         0.         0.         ... 0.89473684 0.29282009 0.29277004]]
Run Code Online (Sandbox Code Playgroud)

我尝试使用以下代码进行转换:

import pyarrow as pa
pa_table = pa.table({"data": Main_x})
pa.parquet.write_table(pa_table, "full_data.parquet")
Run Code Online (Sandbox Code Playgroud)

我得到这个堆栈跟踪:

5 frames
/usr/local/lib/python3.7/dist-packages/pyarrow/table.pxi in pyarrow.lib.table()

/usr/local/lib/python3.7/dist-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pydict()

/usr/local/lib/python3.7/dist-packages/pyarrow/array.pxi in pyarrow.lib.asarray()

/usr/local/lib/python3.7/dist-packages/pyarrow/array.pxi …
Run Code Online (Sandbox Code Playgroud)

numpy parquet pyarrow

5
推荐指数
1
解决办法
6933
查看次数

将 python-polars 连接到 SQL Server(目前不支持)

如何直接将 MS SQL Server 连接到 Polars?

该文档没有列出任何支持的连接,但建议使用 pandas。

更新:

SQL Server 身份验证按答案工作,但 Windows 域身份验证不起作用。参见问题

sql-server sqlalchemy pyarrow python-polars

5
推荐指数
3
解决办法
6112
查看次数

使用 pyarrow 作为 dtype_backend 的 Pandas 2.0 数据帧上的聚合速度非常慢

假设我有以下数据框:

代码 价格
AA1 10
AA1 20
BB2 30

我想对其执行以下操作:

df.groupby("code").aggregate({
    "price": "sum"
})
Run Code Online (Sandbox Code Playgroud)

我尝试使用 Pandas 2.0 中引入的新 pyarrow dtypes,并创建了 3 个副本,对于每个副本,我测量了上述操作的执行时间(5 次执行的平均值)。

代码列数据类型 价格列数据类型 执行时间处理时间
目的 浮动64 2.94秒
字符串[pyarrow] 双[pyarrow] 49.5秒
字符串[pyarrow] 浮动64 1.11秒

谁能解释为什么与标准 numpy float64 dtype 相比,在具有 double pyarrow dtype 的列上应用聚合函数如此慢?

python group-by pandas apache-arrow pyarrow

5
推荐指数
1
解决办法
1283
查看次数