我正在处理熊猫和Spark数据帧。数据帧始终很大(> 20 GB),而标准的火花功能不足以容纳这些大小。目前,我将我的熊猫数据框转换为火花数据框,如下所示:
dataframe = spark.createDataFrame(pandas_dataframe)
Run Code Online (Sandbox Code Playgroud)
我进行这种转换是因为通过火花将数据帧写入hdfs非常容易:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
Run Code Online (Sandbox Code Playgroud)
但是,对于大于2 GB的数据帧,转换失败。如果将spark数据框转换为熊猫,则可以使用pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
Run Code Online (Sandbox Code Playgroud)
这是从Spark到Pandas的快速会话,它也适用于大于2 GB的数据帧。我还找不到其他方法可以做到这一点。意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花。问题是我真的找不到如何将熊猫数据帧写入hdfs。
我的熊猫版本:0.19.0
我正在尝试在我的 alpine docker 映像中使用 pip 安装 pyarrow,但 pip 无法找到该包。
我正在使用以下 Dockerfile:
FROM python:3.6-alpine3.7
RUN apk add --no-cache musl-dev linux-headers g++
RUN pip install pyarrow
Run Code Online (Sandbox Code Playgroud)
输出:
Sending build context to Docker daemon 4.096kB
Step 1/3 : FROM python:3.6-alpine3.7
3.6-alpine3.7: Pulling from library/python
ff3a5c916c92: Pull complete
471170bb1257: Pull complete
d487cc70216e: Pull complete
9358b3ca3321: Pull complete
78b9945f52f1: Pull complete
Digest:
sha256:10bd7a59cfac2a784bedd1e6d89887995559f00b61f005a101845ed736bed779
Status: Downloaded newer image for python:3.6-alpine3.7
---> 4b00a94b6f26
Step 2/3 : RUN apk add --no-cache musl-dev linux-headers g++
---> Running in d024d0b961a6 …Run Code Online (Sandbox Code Playgroud) 我在使用 Apache Arrow Spark 集成时遇到了这个问题。
使用 AWS EMR 和 Spark 2.4.3
在本地 Spark 单机实例和 Cloudera 集群上测试了这个问题,一切正常。
export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3
Run Code Online (Sandbox Code Playgroud)
spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3
Run Code Online (Sandbox Code Playgroud)
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
Run Code Online (Sandbox Code Playgroud)
由于“零复制读取”、“零 Serde”和“跨系统通信无开销”的承诺,我现在对 Apache Arrow 非常感兴趣。我对这个项目的理解(通过 pyarrow 的视角)是它描述了数据的内存和格式,这样多个任务就可以像藏宝图一样读取它,并找到相同的数据(无需复制)。我想我可以在一个进程中看到它在 Python/Pandas 中是如何工作的;创建一个 Arrow 数组,将其传递给不同的对象,并观察整个“零复制”的运作过程非常容易。
然而,当我们谈论没有开销的跨系统通信时,我几乎完全迷失了。例如,PySpark 如何将 Java 对象转换为箭头格式,然后将其传递给 Python/Pandas?我试图查看这里的代码,但对于非 java/scala 人员来说,它看起来只是将 Spark 行转换为 Arrow 对象,然后转换为byteArrays (第 124 行),这看起来不像零复制、零开销大部头书。
同样,如果我想尝试将 Arrow 数组从 Python/pyarrow 传递到 Rust(使用 Rust 的 Arrow API),我无法思考如何做到这一点,特别是考虑到这种调用方法Python 中的 Rust 函数似乎不适用于 Arrow 原语。有没有办法将 Rust 和 Python 指向相同的内存地址?我是否必须以某种方式将箭头数据作为 byteArray 发送?
// lib.rs
#[macro_use]
extern crate cpython;
use cpython::{PyResult, Python};
use arrow::array::Int64Array;
use arrow::compute::array_ops::sum;
fn sum_col(_py: Python, val: Int64Array) -> PyResult<i64> {
let total = …Run Code Online (Sandbox Code Playgroud) 我目前正在开发一个项目,但我很难理解 PySpark 中的 Pandas UDF 是如何工作的。
我有一个 Spark 集群,其中有一个 8 核、64GB 的主节点,以及两个各 16 核、112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。该数据集由 70 列组成。我定义了一个 Pandas UDF 来对数据集执行一些操作,这只能使用 Python 在 Pandas 数据帧上完成。
pandas UDF 是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
Run Code Online (Sandbox Code Playgroud)
绝对没有办法让 Pandas UDF 工作,因为它在执行操作之前就崩溃了。我怀疑某处存在 OOM 错误。上面的代码运行了几分钟,然后崩溃并显示错误代码,表明连接已重置。但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行正常,没有错误。该错误似乎仅在使用 PandasUDF 时发生。
我不明白它是如何工作的。Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驾驶员记忆?执行者的?如果它在驱动程序上,那么所有 Python 代码都在其上执行吗?
集群配置如下:
我是否遗漏了某些内容,或者是否没有办法通过 PandasUDF 运行 78M 行?
我尝试使用 pypy3 安装 pyarrow,但出现一些错误。
基本信息如下:
macOS 10.15.7
Xcode 12.3
python version 3.7.9
pypy3 version 7.3.3
pyarrow version 0.17.1
cmd is 'pip_pypy3 install pyarrow==0.17.1'
Run Code Online (Sandbox Code Playgroud)
日志中的一些关键信息和错误内容:
...
Requirement already satisfied: numpy>=1.14 in /usr/local/Cellar/pypy3/7.3.3/libexec/site-packages (from pyarrow==0.17.1) (1.19.5)
...
cmake -DPYTHON_EXECUTABLE=/usr/local/Cellar/pypy3/7.3.3/bin/pypy3 -DPYARROW_BUILD_CUDA=off -DPYARROW_BUILD_FLIGHT=off -DPYARROW_BUILD_GANDIVA=off -DPYARROW_BUILD_DATASET=off -DPYARROW_BUILD_ORC=off -DPYARROW_BUILD_PARQUET=off -DPYARROW_BUILD_PLASMA=off -DPYARROW_BUILD_S3=off -DPYARROW_BUILD_HDFS=off -DPYARROW_USE_TENSORFLOW=off -DPYARROW_BUNDLE_ARROW_CPP=off -DPYARROW_BUNDLE_BOOST=off -DPYARROW_GENERATE_COVERAGE=off -DPYARROW_BOOST_USE_SHARED=on -DPYARROW_PARQUET_USE_SHARED=on -DCMAKE_BUILD_TYPE=release /private/var/folders/7p/d9yrtx8s2h94h9bh3x801zmr0000gn/T/pip-install-jagh2frg/pyarrow_522bc325fbd74d9ebdf84f29e3a66c0c
...
2021-01-10T21:19:27,670 -- Found Python3: /Library/Frameworks/Python.framework/Versions/3.7/bin/python3.7 (found version "3.7.9") found components: Interpreter Development NumPy Development.Module Development.Embed
2021-01-10T21:19:27,713 -- Found Python3Alt: /Library/Frameworks/Python.framework/Versions/3.7/bin/python3.7
2021-01-10T21:19:28,226 CMake Warning (dev) at …Run Code Online (Sandbox Code Playgroud) 当我执行以下代码时 - 出现以下错误ValueError: Table schema does not match schema used to create file。
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
fields = [
('one', pa.int64()),
('two', pa.string(), False),
('three', pa.bool_())
]
schema = pa.schema(fields)
schema = schema.remove_metadata()
df = pd.DataFrame(
{
'one': [2, 2, 2],
'two': ['foo', 'bar', 'baz'],
'three': [True, False, True]
}
)
df['two'] = df['two'].astype(str)
table = pa.Table.from_pandas(df, schema, preserve_index=False).replace_schema_metadata()
writer = pq.ParquetWriter('parquest_user_defined_schema.parquet', schema=schema)
writer.write_table(table)
Run Code Online (Sandbox Code Playgroud) 我正在使用 Dask 读取 PySpark 生成的 Parquet 文件,其中一列是字典列表(即array<map<string,string>>')。df 的一个例子是:
import pandas as pd
df = pd.DataFrame.from_records([
(1, [{'job_id': 1, 'started': '2019-07-04'}, {'job_id': 2, 'started': '2019-05-04'}], 100),
(5, [{'job_id': 3, 'started': '2015-06-04'}, {'job_id': 9, 'started': '2019-02-02'}], 540)],
columns=['uid', 'job_history', 'latency']
)
Run Code Online (Sandbox Code Playgroud)
当使用 时engine='fastparquet,Dask 可以很好地读取所有其他列,但会None为具有复杂类型的列返回 s 列。当我设置时engine='pyarrow',出现以下异常:
ArrowNotImplementedError: lists with structs are not supported.
Run Code Online (Sandbox Code Playgroud)
许多谷歌搜索已经明确表明,现在并不真正支持读取带有嵌套数组的列,而且我不完全确定处理此问题的最佳方法是什么。我想我的选择是:
json。该模式很简单,如果可能的话就可以完成这项工作list并注意这些列中的数据通过索引相互关联/映射(例如,0这些键/列中的 idx 中的元素全部来自相同来源)。这会起作用,但坦率地说,让我心碎:(我很想听听其他人如何克服这个限制。我的公司经常在其镶木地板中使用嵌套数组,因此我不想放弃使用 Dask。
我正在学习使用 python 和 pyarrow 的镶木地板文件。Parquet 在压缩和最小化磁盘空间方面非常出色。snappy我的数据集是 190MB csv 文件,当保存为压缩 parquet 文件时,最终会成为单个 3MB 文件。
然而,当我将数据集保存为分区文件时,它们会导致组合大小更大(61MB)。
这是我尝试保存的示例数据集:
listing_id | date | gender | price
-------------------------------------------
a | 2019-01-01 | M | 100
b | 2019-01-02 | M | 100
c | 2019-01-03 | F | 200
d | 2019-01-04 | F | 200
Run Code Online (Sandbox Code Playgroud)
当我按日期(300 多个唯一值)分区时,分区文件的总大小为 61MB。每个文件都有168.2kB大小。当我按性别(2 个唯一值)分区时,分区文件的总大小仅为 3MB。
我想知道镶木地板是否有最小文件大小,这样许多小文件组合起来会消耗更大的磁盘空间?
我的环境:
- OS: Ubuntu 18.04
- Language: Python
- Library: pyarrow, pandas
Run Code Online (Sandbox Code Playgroud)
我的数据集来源:
https://www.kaggle.com/brittabettendorf/berlin-airbnb-data
# I am using calendar_summary.csv …Run Code Online (Sandbox Code Playgroud) 我在 GCS 中存储了一个 CSV 文件,我想将其加载到 BigQuery 表中。但我需要先进行一些预处理,所以我将其加载到 DataFrame,然后加载到 BigQuery 表
import pandas as pd
import json
from google.cloud import bigquery
cols_name_list = [....]. # column name in order
uri = "gs://<bucket>/<path>/<csv_file>"
df = pd.read_csv(uri, dtype="string")
df =df.reindex(columns=cols_name_list)
client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
... # added all schema field according to table column type
)
job = client.load_table_from_dataframe(
df, "<bq_table_id>", job_config=job_config
)
job.result()
Run Code Online (Sandbox Code Playgroud)
从上面的代码中,我对数据帧列顺序进行了重新排序,以与 BigQuery 表中的顺序相匹配(不确定这是否重要),并将所有列转换为字符串类型。
我收到此错误,如下所示
pyarrow.lib.ArrowInvalid: Could not convert '47803' with type str: tried to convert to int …Run Code Online (Sandbox Code Playgroud) pyarrow ×10
python ×8
apache-arrow ×3
apache-spark ×3
pyspark ×3
pandas ×2
alpine-linux ×1
amazon-emr ×1
cmake ×1
dask ×1
docker ×1
fastparquet ×1
numpy ×1
parquet ×1
pypy ×1
python-3.7 ×1
python-3.x ×1
rust ×1