我正在处理熊猫和Spark数据帧。数据帧始终很大(> 20 GB),而标准的火花功能不足以容纳这些大小。目前,我将我的熊猫数据框转换为火花数据框,如下所示:
dataframe = spark.createDataFrame(pandas_dataframe)
Run Code Online (Sandbox Code Playgroud)
我进行这种转换是因为通过火花将数据帧写入hdfs非常容易:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
Run Code Online (Sandbox Code Playgroud)
但是,对于大于2 GB的数据帧,转换失败。如果将spark数据框转换为熊猫,则可以使用pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
Run Code Online (Sandbox Code Playgroud)
这是从Spark到Pandas的快速会话,它也适用于大于2 GB的数据帧。我还找不到其他方法可以做到这一点。意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花。问题是我真的找不到如何将熊猫数据帧写入hdfs。
我的熊猫版本:0.19.0
使用pandas 中的 IO 工具可以将 a 转换为DataFrame内存中的羽化缓冲区:
import pandas as pd
from io import BytesIO
df = pd.DataFrame({'a': [1,2], 'b': [3.0,4.0]})
buf = BytesIO()
df.to_feather(buf)
Run Code Online (Sandbox Code Playgroud)
但是,使用相同的缓冲区转换回 DataFrame
pd.read_feather(buf)
Run Code Online (Sandbox Code Playgroud)
结果报错:
ArrowInvalid:不是羽毛文件
如何将 DataFrame 转换为内存中的羽化表示,并相应地转换回 DataFrame?
预先感谢您的考虑和回应。
我在使用 Apache Arrow Spark 集成时遇到了这个问题。
使用 AWS EMR 和 Spark 2.4.3
在本地 Spark 单机实例和 Cloudera 集群上测试了这个问题,一切正常。
export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3
Run Code Online (Sandbox Code Playgroud)
spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3
Run Code Online (Sandbox Code Playgroud)
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
Run Code Online (Sandbox Code Playgroud)
使用 Python、Parquet 和 SparkArrowNotImplementedError: Support for codec 'snappy' not built升级到pyarrow=3.0.0. 我以前没有这个错误的版本是pyarrow=0.17. 该错误未出现在 中pyarrow=1.0.1,但确实出现在pyarrow=2.0.0. 这个想法是使用 Snappy 压缩将 Pandas DataFrame 编写为 Parquet 数据集(在 Windows 上),然后使用 Spark 处理 Parquet 数据集。
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df = pd.DataFrame({
'x': [0, 0, 0, 1, 1, 1],
'a': np.random.random(6),
'b': np.random.random(6)})
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_to_dataset(table, root_path=r'c:/data', partition_cols=['x'], flavor='spark')
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用arrow包(最近实现的) DataSet API 将文件目录读入内存,并利用c++后端来过滤行和列。我想arrow直接使用包函数,而不是样式动词的包装函数dplyr。截至目前,这些函数还处于其生命周期的早期阶段,因此我很难找到一些说明语法的示例。
为了理解语法,我创建了一个非常小的测试示例。前两个查询按预期工作。
library(arrow) ## version 4.0.0
write.csv(mtcars,"ArrowTest_mtcars/mtcars.csv")
## Define a dataset object
DS <- arrow::open_dataset(sources = "ArrowTest_mtcars", format = "text")
## Generate a basic scanner
AT <- DS$NewScan()$UseThreads()$Finish()$ToTable()
head(as.data.frame(AT), n = 3)
## mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Mazda RX4 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
## 2 Mazda RX4 Wag 21.0 6 160 110 …Run Code Online (Sandbox Code Playgroud) 我正在尝试做什么
我正在使用PyArrow读取一些 CSV 并将它们转换为 Parquet。我读到的一些文件有很多列并且内存占用很大(足以使运行作业的机器崩溃)。我试图在读取 CSV 时对文件进行分块,其方式与 Pandas read_csv的chunksize工作方式类似。
例如,这就是分块代码在 pandas 中的工作方式:
chunks = pandas.read_csv(data, chunksize=100, iterator=True)
# Iterate through chunks
for chunk in chunks:
do_stuff(chunk)
Run Code Online (Sandbox Code Playgroud)
我想将类似的功能移植到 Arrow
我尝试过做什么
我注意到 Arrow 有ReadOptions其中包含一个block_size参数,我想也许我可以像这样使用它:
# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
input_file=input_buffer,
read_options=arrow_csv.ReadOptions(
use_threads=True,
block_size=4096
)
)
# Iterate through batches
for batch in arrow_table.to_batches():
do_stuff(batch)
Run Code Online (Sandbox Code Playgroud)
由于这个 ( block_size) 似乎没有返回迭代器,所以我的印象是这仍然会让 Arrow 读取内存中的整个表,从而重新创建我的问题。
最后,我知道我可以首先使用 Pandas 读取 csv 并对其进行分块,然后转换为 Arrow 表。但我试图避免使用 Pandas,只使用 Arrow。 …
我正在向非常熟悉 Apache Parquet 二进制布局的人寻求建议:
F(a) = b进行完全确定性的数据转换F,并使用整个软件堆栈(框架、箭头和 parquet 库)的相同版本 -每次保存到 Parquet 中时,我在不同主机上获得相同的数据帧二进制表示的可能性有多大?bb
换句话说,Parquet 在二进制级别上的可重现性如何?当数据逻辑上相同时,什么会导致二进制差异?
我正在开发一个系统,用于完全可重现和确定性的数据处理和计算数据集哈希来断言这些保证。
我的主要目标是确保数据集b包含一组相同的记录作为数据集b'- 这当然与对 Arrow/Parquet 的二进制表示进行哈希处理非常不同。不想处理存储格式的可重复性,我一直在计算逻辑数据哈希。这是缓慢但灵活的,例如,即使记录被重新排序(我认为是等效的数据集),我的哈希值也保持不变。
但是,当考虑与依赖于文件哈希的IPFS其他内容可寻址存储集成时,只有一个哈希(物理)而不是两个(逻辑+物理)会大大简化设计,但这意味着我必须保证Parquet 文件是可复制的。
我决定暂时继续使用逻辑哈希。
我创建了一个新的 Rust 箱arrow-digest,它实现了 Arrow 数组和记录批次的稳定哈希,并努力隐藏与编码相关的差异。如果有人发现哈希算法有用并希望用另一种语言实现它,则该板条箱的自述文件描述了该算法。
当我将其集成到我正在开发的去中心化数据处理工具中时,我将继续扩展支持的类型集。
从长远来看,我不确定逻辑哈希是否是最好的前进方式 - Parquet 的一个子集,它牺牲了一些效率,只是为了使文件布局具有确定性,这可能是内容可寻址性的更好选择。
我不知道从哪里开始,所以寻找一些指导。我正在寻找一种在一个进程中创建一些数组/表,并可以从另一个进程访问(只读)的方法。
所以我创建了一个pyarrow.Table这样的:
a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])
a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
# 0,
# 1,
# 2
# ]
a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
# "foo",
# "bar",
# "baz"
# ]
tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])
tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]
Run Code Online (Sandbox Code Playgroud)
现在我如何从不同的进程中读取它?我以为我会使用multiprocessing.shared_memory.SharedMemory,但这不太有效:
shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out: …Run Code Online (Sandbox Code Playgroud) 我遇到了越来越多的情况,需要在 R 中使用内存不足 (OOM) 方法进行数据分析。我熟悉其他 OOM 方法,例如sparklyr和 ,DBI但我最近遇到arrow并希望对其进行更多探索。
问题是我通常使用的平面文件足够大,如果没有帮助,它们无法读入 R。因此,我理想情况下更喜欢一种无需首先将数据集读入 R 即可进行转换的方法。
您能提供的任何帮助将不胜感激!
我知道“许多 Arrow 对象是不可变的:一旦构造,它们的逻辑属性就不能再改变”(文档)。在《绿箭侠》创作者之一的这篇博文中,据说
Arrow C++ 中的表列可以分块,因此附加到表是零复制操作,不需要重要的计算或内存分配。
但是,我无法在文档中找到如何将行追加到表中。pyarrow.concat_tables(tables, promote=False)做了类似的事情,但据我了解,它会生成一个新的 Table 对象,而不是向现有对象添加块。
我不确定这个操作是否完全可能/有意义(在这种情况下我想知道如何进行)或者是否不可行(在这种情况下,pyarrow.concat_tables这正是我所需要的)。
类似问题:
Table物体。可能来自read_csv操作或手动构建。apache-arrow ×10
pyarrow ×6
python ×4
parquet ×3
apache-spark ×2
pandas ×2
r ×2
amazon-emr ×1
csv ×1
feather ×1
import ×1
ipc ×1
pyspark ×1
python-3.x ×1