我正在尝试将一个大的镶木地板文件写入磁盘(大于内存)。我天真地认为我可以聪明地使用 ParquetWriter 和 write_table 增量写入文件,如下所示(POC):
import pyarrow as pa
import pyarrow.parquet as pq
import pickle
import time
arrow_schema = pickle.load(open('schema.pickle', 'rb'))
rows_dataframe = pickle.load(open('rows.pickle', 'rb'))
output_file = 'test.parq'
with pq.ParquetWriter(
output_file,
arrow_schema,
compression='snappy',
allow_truncated_timestamps=True,
version='2.0', # Highest available schema
data_page_version='2.0', # Highest available schema
) as writer:
for rows_dataframe in function_that_yields_data()
writer.write_table(
pa.Table.from_pydict(
rows_dataframe,
arrow_schema
)
)
Run Code Online (Sandbox Code Playgroud)
但即使我生成了块(比如我的例子中的 10000 行)并使用write_table它仍然将整个数据集保留在内存中。
事实证明,ParquetWriter 将整个数据集保留在内存中,同时增量写入磁盘。
是否有办法强制 ParquetWriter 不将整个数据集保留在内存中,或者出于充分的原因根本不可能?
我有进程 A 和进程 B。进程 A 打开一个文件,调用 mmap 并写入它,进程 B 执行相同的操作,但当进程 A 完成写入时读取相同的映射区域。
使用 mmap,进程 B 应该从内存而不是磁盘读取文件,假设进程 A 没有调用 munmap。
如果我想将进程 A 和进程 B 部署到 Kubernetes 中同一 Pod 中的不同容器,内存映射 IO 的工作方式是否应该与初始示例相同?容器 B(进程 B)是否应该像在常规 Linux 桌面中一样从内存中读取文件?
假设两个容器位于同一个 Pod 中,并且从同一个持久卷读取/写入文件。我是否需要考虑特定类型的卷来实现 mmap IO?
如果您好奇的话,我正在使用 Apache Arrow 和 pyarrow 来读取和写入这些文件并实现零复制读取。
我对 pandas 和 parquet 文件类型是全新的。我有一个 python 脚本:
然后使用 impala-shell 将 parquet 文件导入回 hdfs。
我遇到的问题似乎与步骤 2 有关。我让它在读入数据帧后以及在步骤 3 中进行任何更改之前立即打印出数据帧的内容。它似乎正在更改数据类型和数据某些字段,当将其写回 parquet 文件时会导致问题。例子:
看来它实际上正在更改这些值,因为当它写入 parquet 文件并将其导入 hdfs 并运行查询时,我收到如下错误:
WARNINGS: File '<path>/test.parquet' has an incompatible Parquet schema for column
'<database>.<table>.tport'. Column type: INT, Parquet schema:
optional double tport [i:1 d:1 r:0]
Run Code Online (Sandbox Code Playgroud)
我不知道为什么它会改变数据而不是保持原样。如果发生这种情况,我不知道是否需要循环每一列并将所有这些替换回其原始值,或者是否有其他方法告诉它不要管它们。
我一直在使用这个参考页面: http://arrow.apache.org/docs/python/parquet.html
它用
pq.read_table(in_file)
Run Code Online (Sandbox Code Playgroud)
读取镶木地板文件,然后
df = table2.to_pandas()
Run Code Online (Sandbox Code Playgroud)
转换为我可以循环并更改列的数据框。我不明白为什么它会改变数据,而且我找不到办法防止这种情况发生。我需要用比 …
我刚刚读到 HDF5 允许您访问数据查找,而无需将整个文件读入内存。
这种寻找行为在没有 Java 的 Parquet 文件中是否可能(非 pyspark 解决方案)?我使用 Parquet 是因为它有强大的 dtype 支持。
import h5py
f = h5py.File('my_file.hdf5', 'w')
dset = f.create_dataset('coords', data=my_ndarray)
f.close()
f = h5py.File('my_file.hdf5', 'r')
dset = f['coords']
my_array = dset[-2:]
Run Code Online (Sandbox Code Playgroud)
https://arrow.apache.org/docs/python/parquet.html#inspecting-the-parquet-file-metadata
我在这里看到 Parquet 元数据具有num_row_groups: 1 (or more). 但我不确定这如何帮助我获取行 [23, 42, 117, 99293184]。
我有一个结构如下的数据框:
Coumn1 Coumn2
0 (0.00030271668219938874, 0.0002655923890415579... (0.0016430083196610212, 0.0014970217598602176,...
1 (0.00015607803652528673, 0.0001314736582571640... (0.0022136708721518517, 0.0014974646037444472,...
2 (0.011317798867821693, 0.011339936405420303, 0... (0.004868391435593367, 0.004406007472425699, 0...
3 (3.94578673876822e-05, 3.075833956245333e-05, ... (0.0075020878575742245, 0.0096737677231431, 0....
4 (0.0004926157998852432, 0.0003811710048466921,... (0.010351942852139473, 0.008231297135353088, 0...
.. ... ...
130 (0.011190211400389671, 0.011337820440530777, 0... (0.010182800702750683, 0.011351295746862888, 0...
131 (0.006286659277975559, 0.007315031252801418, 0... (0.02104150503873825, 0.02531484328210354, 0.0...
132 (0.0022791570518165827, 0.0025983047671616077,... (0.008847278542816639, 0.009222050197422504, 0...
133 (0.0007059817435219884, 0.0009831463685259223,... (0.0028264704160392284, 0.0029402063228189945,...
134 (0.0018992726691067219, 0.002058899961411953, ... (0.0019639385864138603, 0.002009353833273053, ...
[135 rows x 2 columns]
Run Code Online (Sandbox Code Playgroud)
其中每个单元格保存一些浮点值的列表/元组:
type(psd_res.data_frame['Column1'][0])
<class 'tuple'>
type(psd_res.data_frame['Column1'][0][0])
<class 'numpy.float64'> …Run Code Online (Sandbox Code Playgroud) 是否有 dask 相当于 Spark 在读取镶木地板文件时指定模式的能力?可能使用传递给 pyarrow 的 kwargs 吗?
我的桶中有一堆镶木地板文件,但某些字段的名称略有不一致。我可以在阅读它们后创建一个自定义延迟函数来处理这些情况,但我希望在通过全局打开它们时可以指定模式。也许不是,因为我猜想通过 globing 打开然后会尝试将它们连接起来。由于字段名称不一致,目前此操作失败。
创建镶木地板文件:
import dask.dataframe as dd
df = dd.demo.make_timeseries(
start="2000-01-01",
end="2000-01-03",
dtypes={"id": int, "z": int},
freq="1h",
partition_freq="24h",
)
df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
Run Code Online (Sandbox Code Playgroud)
通过 dask 读取它并在读取后指定模式:
df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})
Run Code Online (Sandbox Code Playgroud)
通过 Spark 读取它并指定模式:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()
schema = T.StructType(
[
T.StructField("id", T.IntegerType()),
T.StructField("a", T.FloatType()),
T.StructField("timestamp", T.TimestampType()),
]
)
df = spark.read.format("parquet").schema(schema).load("df.parquet")
Run Code Online (Sandbox Code Playgroud) 有人可以给我一个提示,告诉我如何使用PyArrow 的新文件系统接口(即 upload、copyFromLocal)将文件从本地文件系统复制到 HDFS 文件系统吗?
我已经反复阅读了文档,并尝试了一些方法(使用 copy_file() 和 FS URI),但似乎都不起作用。旧版 HDFS API的使用很简单,但它已被弃用,尽管新 API 似乎不完整。当然,在文件描述符之间移动数据块是一种解决方案,但是为什么copy_file()存在呢?
我有一个 <class 'numpy.ndarray'> 数组,我想将其保存到 parquet 文件中以传递给我正在构建的 ML 模型。我的数组有 159573 个数组,每个数组有 1395 个数组。
这是我的数据示例:
[[0. 0. 0. ... 0.24093714 0.75547471 0.74532781]
[0. 0. 0. ... 0.24093714 0.75547471 0.74532781]
[0. 0. 0. ... 0.24093714 0.75547471 0.74532781]
...
[0. 0. 0. ... 0.89473684 0.29282009 0.29277004]
[0. 0. 0. ... 0.89473684 0.29282009 0.29277004]
[0. 0. 0. ... 0.89473684 0.29282009 0.29277004]]
Run Code Online (Sandbox Code Playgroud)
我尝试使用以下代码进行转换:
import pyarrow as pa
pa_table = pa.table({"data": Main_x})
pa.parquet.write_table(pa_table, "full_data.parquet")
Run Code Online (Sandbox Code Playgroud)
我得到这个堆栈跟踪:
5 frames
/usr/local/lib/python3.7/dist-packages/pyarrow/table.pxi in pyarrow.lib.table()
/usr/local/lib/python3.7/dist-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pydict()
/usr/local/lib/python3.7/dist-packages/pyarrow/array.pxi in pyarrow.lib.asarray()
/usr/local/lib/python3.7/dist-packages/pyarrow/array.pxi …Run Code Online (Sandbox Code Playgroud) 如何直接将 MS SQL Server 连接到 Polars?
该文档没有列出任何支持的连接,但建议使用 pandas。
更新:
SQL Server 身份验证按答案工作,但 Windows 域身份验证不起作用。参见问题
假设我有以下数据框:
| 代码 | 价格 |
|---|---|
| AA1 | 10 |
| AA1 | 20 |
| BB2 | 30 |
我想对其执行以下操作:
df.groupby("code").aggregate({
"price": "sum"
})
Run Code Online (Sandbox Code Playgroud)
我尝试使用 Pandas 2.0 中引入的新 pyarrow dtypes,并创建了 3 个副本,对于每个副本,我测量了上述操作的执行时间(5 次执行的平均值)。
| 代码列数据类型 | 价格列数据类型 | 执行时间处理时间 |
|---|---|---|
| 目的 | 浮动64 | 2.94秒 |
| 字符串[pyarrow] | 双[pyarrow] | 49.5秒 |
| 字符串[pyarrow] | 浮动64 | 1.11秒 |
谁能解释为什么与标准 numpy float64 dtype 相比,在具有 double pyarrow dtype 的列上应用聚合函数如此慢?
pyarrow ×10
parquet ×6
apache-arrow ×4
pandas ×4
python ×4
apache-spark ×1
dask ×1
dataframe ×1
docker ×1
fastparquet ×1
group-by ×1
hdfs ×1
kubernetes ×1
numpy ×1
python-3.x ×1
sql-server ×1
sqlalchemy ×1