标签: apache-arrow

如何将巨大的熊猫数据帧保存到HDFS?

我正在处理熊猫和Spark数据帧。数据帧始终很大(> 20 GB),而标准的火花功能不足以容纳这些大小。目前,我将我的熊猫数据框转换为火花数据框,如下所示:

dataframe = spark.createDataFrame(pandas_dataframe)  
Run Code Online (Sandbox Code Playgroud)

我进行这种转换是因为通过火花将数据帧写入hdfs非常容易:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
Run Code Online (Sandbox Code Playgroud)

但是,对于大于2 GB的数据帧,转换失败。如果将spark数据框转换为熊猫,则可以使用pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)
Run Code Online (Sandbox Code Playgroud)

这是从Spark到Pandas的快速会话,它也适用于大于2 GB的数据帧。我还找不到其他方法可以做到这一点。意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花。问题是我真的找不到如何将熊猫数据帧写入hdfs。

我的熊猫版本:0.19.0

python pandas apache-spark apache-arrow pyarrow

7
推荐指数
1
解决办法
9058
查看次数

将 Pandas DataFrame 与 In-Memory Feather 相互转换

使用pandas 中IO 工具可以将 a 转换为DataFrame内存中的羽化缓冲区:

import pandas as pd  
from io import BytesIO 

df = pd.DataFrame({'a': [1,2], 'b': [3.0,4.0]})  

buf = BytesIO()

df.to_feather(buf)
Run Code Online (Sandbox Code Playgroud)

但是,使用相同的缓冲区转换回 DataFrame

pd.read_feather(buf)
Run Code Online (Sandbox Code Playgroud)

结果报错:

ArrowInvalid:不是羽毛文件

如何将 DataFrame 转换为内存中的羽化表示,并相应地转换回 DataFrame?

预先感谢您的考虑和回应。

python python-3.x pandas feather apache-arrow

7
推荐指数
1
解决办法
4069
查看次数

AWS EMR - ModuleNotFoundError:没有名为“pyarrow”的模块

我在使用 Apache Arrow Spark 集成时遇到了这个问题。

使用 AWS EMR 和 Spark 2.4.3

在本地 Spark 单机实例和 Cloudera 集群上测试了这个问题,一切正常。

在spark-env.sh中设置这些

export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3
Run Code Online (Sandbox Code Playgroud)

在 Spark shell 中确认了这一点

spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3
Run Code Online (Sandbox Code Playgroud)

使用 apache arrow 集成运行基本的 pandas_udf 会导致错误

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
Run Code Online (Sandbox Code Playgroud)

aws emr 上出现错误 …

amazon-emr apache-spark pyspark apache-arrow pyarrow

7
推荐指数
1
解决办法
7253
查看次数

使用 pyarrow 的 Python 错误 - ArrowNotImplementedError:未构建对编解码器“snappy”的支持

使用 Python、Parquet 和 SparkArrowNotImplementedError: Support for codec 'snappy' not built升级到pyarrow=3.0.0. 我以前没有这个错误的版本是pyarrow=0.17. 该错误出现在 中pyarrow=1.0.1但确实出现在pyarrow=2.0.0. 这个想法是使用 Snappy 压缩将 Pandas DataFrame 编写为 Parquet 数据集(在 Windows 上),然后使用 Spark 处理 Parquet 数据集。

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

df = pd.DataFrame({
    'x': [0, 0, 0, 1, 1, 1], 
    'a': np.random.random(6), 
    'b': np.random.random(6)})
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_to_dataset(table, root_path=r'c:/data', partition_cols=['x'], flavor='spark')
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

parquet apache-arrow pyarrow

7
推荐指数
2
解决办法
3861
查看次数

R 中箭头数据集过滤表达式的正确语法

我正在尝试使用arrow(最近实现的) DataSet API 将文件目录读入内存,并利用c++后端来过滤行和列。我想arrow直接使用包函数,而不是样式动词的包装函数dplyr。截至目前,这些函数还处于其生命周期的早期阶段,因此我很难找到一些说明语法的示例。

为了理解语法,我创建了一个非常小的测试示例。前两个查询按预期工作。

library(arrow) ## version 4.0.0

write.csv(mtcars,"ArrowTest_mtcars/mtcars.csv")
## Define a dataset object
DS <- arrow::open_dataset(sources = "ArrowTest_mtcars", format = "text")

## Generate a basic scanner 
AT <- DS$NewScan()$UseThreads()$Finish()$ToTable()
head(as.data.frame(AT), n = 3)
##                      mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1         Mazda RX4 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
## 2     Mazda RX4 Wag 21.0   6  160 110 …
Run Code Online (Sandbox Code Playgroud)

r apache-arrow

7
推荐指数
1
解决办法
1277
查看次数

如何使用 Arrow 对 CSV 进行分块?

我正在尝试做什么

我正在使用PyArrow读取一些 CSV 并将它们转换为 Parquet。我读到的一些文件有很多列并且内存占用很大(足以使运行作业的机器崩溃)。我试图在读取 CSV 时对文件进行分块,其方式与 Pandas read_csvchunksize工作方式类似。

例如,这就是分块代码在 pandas 中的工作方式:

chunks = pandas.read_csv(data, chunksize=100, iterator=True)

# Iterate through chunks
for chunk in chunks:
    do_stuff(chunk)
Run Code Online (Sandbox Code Playgroud)

我想将类似的功能移植到 Arrow

我尝试过做什么

我注意到 Arrow 有ReadOptions其中包含一个block_size参数,我想也许我可以像这样使用它:

# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
    input_file=input_buffer,
    read_options=arrow_csv.ReadOptions(
        use_threads=True,
        block_size=4096
    )
)

# Iterate through batches
for batch in arrow_table.to_batches():
    do_stuff(batch)
Run Code Online (Sandbox Code Playgroud)

由于这个 ( block_size) 似乎没有返回迭代器,所以我的印象是这仍然会让 Arrow 读取内存中的整个表,从而重新创建我的问题。

最后,我知道我可以首先使用 Pandas 读取 csv 并对其进行分块,然后转换为 Arrow 表。但我试图避免使用 Pandas,只使用 Arrow。 …

python apache-arrow pyarrow

7
推荐指数
1
解决办法
4723
查看次数

Parquet 格式的可重复性/确定性如何?

我正在向非常熟悉 Apache Parquet 二进制布局的人寻求建议:

F(a) = b进行完全确定性的数据转换F,并使用整个软件堆栈(框架、箭头和 parquet 库)的相同版本 -每次保存到 Parquet 中时,我在不同主机上获得相同的数据帧二进制表示的可能性有多大?bb

换句话说,Parquet 在二进制级别上的可重现性如何?当数据逻辑上相同时,什么会导致二进制差异?

  • 由于对齐,值之间是否可能存在一些 uninit 内存?
  • 假设所有序列化设置(压缩、分块、字典的使用等)都相同,结果是否仍然会漂移?

语境

我正在开发一个系统,用于完全可重现和确定性的数据处理和计算数据集哈希来断言这些保证。

我的主要目标是确保数据集b包含一组相同的记录作为数据集b'- 这当然与对 Arrow/Parquet 的二进制表示进行哈希处理非常不同。不想处理存储格式的可重复性,我一直在计算逻辑数据哈希。这是缓慢但灵活的,例如,即使记录被重新排序(我认为是等效的数据集),我的哈希值也保持不变。

但是,当考虑与依赖于文件哈希的IPFS其他内容可寻址存储集成时,只有一个哈希(物理)而不是两个(逻辑+物理)会大大简化设计,但这意味着我必须保证Parquet 文件是可复制的。


更新

我决定暂时继续使用逻辑哈希。

我创建了一个新的 Rust 箱arrow-digest,它实现了 Arrow 数组和记录批次的稳定哈希,并努力隐藏与编码相关的差异。如果有人发现哈希算法有用并希望用另一种语言实现它,则该板条箱的自述文件描述了该算法。

当我将其集成到我正在开发的去中心化数据处理工具中时,我将继续扩展支持的类型集。

从长远来看,我不确定逻辑哈希是否是最好的前进方式 - Parquet 的一个子集,它牺牲了一些效率,只是为了使文件布局具有确定性,这可能是内容可寻址性的更好选择。

parquet apache-arrow

7
推荐指数
1
解决办法
738
查看次数

如何从多个进程(可能来自不同的语言)使用 Apache Arrow IPC?

我不知道从哪里开始,所以寻找一些指导。我正在寻找一种在一个进程中创建一些数组/表,并可以从另一个进程访问(只读)的方法。

所以我创建了一个pyarrow.Table这样的:

a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])

a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
#   0,
#   1,
#   2
# ]

a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
#   "foo",
#   "bar",
#   "baz"
# ]

tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])

tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]
Run Code Online (Sandbox Code Playgroud)

现在我如何从不同的进程中读取它?我以为我会使用multiprocessing.shared_memory.SharedMemory,但这不太有效:

shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out: …
Run Code Online (Sandbox Code Playgroud)

python ipc apache-arrow pyarrow

7
推荐指数
1
解决办法
1050
查看次数

从 R 中的 CSV 文件创建镶木地板文件目录

我遇到了越来越多的情况,需要在 R 中使用内存不足 (OOM) 方法进行数据分析。我熟悉其他 OOM 方法,例如sparklyr和 ,DBI但我最近遇到arrow并希望对其进行更多探索。

问题是我通常使用的平面文件足够大,如果没有帮助,它们无法读入 R。因此,我理想情况下更喜欢一种无需首先将数据集读入 R 即可进行转换的方法。

您能提供的任何帮助将不胜感激!

csv import r parquet apache-arrow

6
推荐指数
1
解决办法
2065
查看次数

是否可以将行追加到现有的 Arrow (PyArrow) 表中?

我知道“许多 Arrow 对象是不可变的:一旦构造,它们的逻辑属性就不能再改变”(文档)。在《绿箭侠》创作者之一的这篇博文中,据说

Arrow C++ 中的表列可以分块,因此附加到表是零复制操作,不需要重要的计算或内存分配。

但是,我无法在文档中找到如何将行追加到表中。pyarrow.concat_tables(tables, promote=False)做了类似的事情,但据我了解,它会生成一个新的 Table 对象,而不是向现有对象添加块。

我不确定这个操作是否完全可能/有意义(在这种情况下我想知道如何进行)或者是否不可行(在这种情况下,pyarrow.concat_tables这正是我所需要的)。

类似问题:

apache-arrow pyarrow

6
推荐指数
1
解决办法
5261
查看次数