标签: pyarrow

如何将巨大的熊猫数据帧保存到HDFS?

我正在处理熊猫和Spark数据帧。数据帧始终很大(> 20 GB),而标准的火花功能不足以容纳这些大小。目前,我将我的熊猫数据框转换为火花数据框,如下所示:

dataframe = spark.createDataFrame(pandas_dataframe)  
Run Code Online (Sandbox Code Playgroud)

我进行这种转换是因为通过火花将数据帧写入hdfs非常容易:

dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
Run Code Online (Sandbox Code Playgroud)

但是,对于大于2 GB的数据帧,转换失败。如果将spark数据框转换为熊猫,则可以使用pyarrow:

// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)
Run Code Online (Sandbox Code Playgroud)

这是从Spark到Pandas的快速会话,它也适用于大于2 GB的数据帧。我还找不到其他方法可以做到这一点。意思是有一个熊猫数据框,我在pyarrow的帮助下将其转换为火花。问题是我真的找不到如何将熊猫数据帧写入hdfs。

我的熊猫版本:0.19.0

python pandas apache-spark apache-arrow pyarrow

7
推荐指数
1
解决办法
9058
查看次数

如何在 Alpine Docker 镜像上安装 pyarrow?

我正在尝试在我的 alpine docker 映像中使用 pip 安装 pyarrow,但 pip 无法找到该包。

我正在使用以下 Dockerfile:

FROM python:3.6-alpine3.7

RUN apk add --no-cache musl-dev linux-headers g++

RUN pip install pyarrow
Run Code Online (Sandbox Code Playgroud)

输出:

Sending build context to Docker daemon  4.096kB
Step 1/3 : FROM python:3.6-alpine3.7
3.6-alpine3.7: Pulling from library/python
ff3a5c916c92: Pull complete
471170bb1257: Pull complete
d487cc70216e: Pull complete
9358b3ca3321: Pull complete
78b9945f52f1: Pull complete
Digest: 
sha256:10bd7a59cfac2a784bedd1e6d89887995559f00b61f005a101845ed736bed779
Status: Downloaded newer image for python:3.6-alpine3.7
---> 4b00a94b6f26
Step 2/3 : RUN apk add --no-cache musl-dev linux-headers g++
---> Running in d024d0b961a6 …
Run Code Online (Sandbox Code Playgroud)

python docker alpine-linux pyarrow

7
推荐指数
2
解决办法
1万
查看次数

AWS EMR - ModuleNotFoundError:没有名为“pyarrow”的模块

我在使用 Apache Arrow Spark 集成时遇到了这个问题。

使用 AWS EMR 和 Spark 2.4.3

在本地 Spark 单机实例和 Cloudera 集群上测试了这个问题,一切正常。

在spark-env.sh中设置这些

export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3
Run Code Online (Sandbox Code Playgroud)

在 Spark shell 中确认了这一点

spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3
Run Code Online (Sandbox Code Playgroud)

使用 apache arrow 集成运行基本的 pandas_udf 会导致错误

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
Run Code Online (Sandbox Code Playgroud)

aws emr 上出现错误 …

amazon-emr apache-spark pyspark apache-arrow pyarrow

7
推荐指数
1
解决办法
7253
查看次数

apache arrow 如何促进“跨系统通信无开销”?

由于“零复制读取”、“零 Serde”和“跨系统通信无开销”的承诺,我现在对 Apache Arrow 非常感兴趣。我对这个项目的理解(通过 pyarrow 的视角)是它描述了数据的内存格式,这样多个任务就可以像藏宝图一样读取它,并找到相同的数据(无需复制)。我想我可以在一个进程中看到它在 Python/Pandas 中是如何工作的;创建一个 Arrow 数组,将其传递给不同的对象,并观察整个“零复制”的运作过程非常容易。

然而,当我们谈论没有开销的跨系统通信时,我几乎完全迷失了。例如,PySpark 如何将 Java 对象转换为箭头格式,然后将其传递给 Python/Pandas?我试图查看这里的代码,但对于非 java/scala 人员来说,它看起来只是将 Spark 行转换为 Arrow 对象,然后转换为byteArrays (第 124 行),这看起来不像零复制、零开销大部头书。

同样,如果我想尝试将 Arrow 数组从 Python/pyarrow 传递到 Rust(使用 Rust 的 Arrow API),我无法思考如何做到这一点,特别是考虑到这种调用方法Python 中的 Rust 函数似乎不适用于 Arrow 原语。有没有办法将 Rust 和 Python 指向相同的内存地址?我是否必须以某种方式将箭头数据作为 byteArray 发送?

// lib.rs
#[macro_use]
extern crate cpython;

use cpython::{PyResult, Python};
use arrow::array::Int64Array;
use arrow::compute::array_ops::sum;

fn sum_col(_py: Python, val: Int64Array) -> PyResult<i64> {
    let total = …
Run Code Online (Sandbox Code Playgroud)

python rust pyspark apache-arrow pyarrow

7
推荐指数
1
解决办法
1721
查看次数

在大型数据集上运行 Pandas UDF 时出现问题

我目前正在开发一个项目,但我很难理解 PySpark 中的 Pandas UDF 是如何工作的。

我有一个 Spark 集群,其中有一个 8 核、64GB 的主节点,以及两个各 16 核、112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。该数据集由 70 列组成。我定义了一个 Pandas UDF 来对数据集执行一些操作,这只能使用 Python 在 Pandas 数据帧上完成。

pandas UDF 是这样定义的:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)
Run Code Online (Sandbox Code Playgroud)

绝对没有办法让 Pandas UDF 工作,因为它在执行操作之前就崩溃了。我怀疑某处存在 OOM 错误。上面的代码运行了几分钟,然后崩溃并显示错误代码,表明连接已重置。但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行正常,没有错误。该错误似乎仅在使用 PandasUDF 时发生。

我不明白它是如何工作的。Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驾驶员记忆?执行者的?如果它在驱动程序上,那么所有 Python 代码都在其上执行吗?

集群配置如下:

  • SPARK_WORKER_CORES=2
  • SPARK_WORKER_MEMORY=64g
  • Spark.executor.cores 2
  • Spark.executor.memory 30g(为Python实例提供内存)
  • 火花.驱动.内存 43g

我是否遗漏了某些内容,或者是否没有办法通过 PandasUDF 运行 78M 行?

python apache-spark pyspark pyarrow

7
推荐指数
1
解决办法
5118
查看次数

使用pip_pypy3安装pyarrow时出现“Could NOT find Arrow”错误

我尝试使用 pypy3 安装 pyarrow,但出现一些错误。

基本信息如下:

macOS 10.15.7
Xcode 12.3
python version 3.7.9
pypy3 version 7.3.3 
pyarrow version 0.17.1
cmd is 'pip_pypy3 install pyarrow==0.17.1'
Run Code Online (Sandbox Code Playgroud)

日志中的一些关键信息和错误内容:

...
Requirement already satisfied: numpy>=1.14 in /usr/local/Cellar/pypy3/7.3.3/libexec/site-packages (from pyarrow==0.17.1) (1.19.5)
...
cmake -DPYTHON_EXECUTABLE=/usr/local/Cellar/pypy3/7.3.3/bin/pypy3  -DPYARROW_BUILD_CUDA=off -DPYARROW_BUILD_FLIGHT=off -DPYARROW_BUILD_GANDIVA=off -DPYARROW_BUILD_DATASET=off -DPYARROW_BUILD_ORC=off -DPYARROW_BUILD_PARQUET=off -DPYARROW_BUILD_PLASMA=off -DPYARROW_BUILD_S3=off -DPYARROW_BUILD_HDFS=off -DPYARROW_USE_TENSORFLOW=off -DPYARROW_BUNDLE_ARROW_CPP=off -DPYARROW_BUNDLE_BOOST=off -DPYARROW_GENERATE_COVERAGE=off -DPYARROW_BOOST_USE_SHARED=on -DPYARROW_PARQUET_USE_SHARED=on -DCMAKE_BUILD_TYPE=release /private/var/folders/7p/d9yrtx8s2h94h9bh3x801zmr0000gn/T/pip-install-jagh2frg/pyarrow_522bc325fbd74d9ebdf84f29e3a66c0c
...
2021-01-10T21:19:27,670   -- Found Python3: /Library/Frameworks/Python.framework/Versions/3.7/bin/python3.7 (found version "3.7.9") found components: Interpreter Development NumPy Development.Module Development.Embed
2021-01-10T21:19:27,713   -- Found Python3Alt: /Library/Frameworks/Python.framework/Versions/3.7/bin/python3.7
2021-01-10T21:19:28,226   CMake Warning (dev) at …
Run Code Online (Sandbox Code Playgroud)

python pypy cmake pyarrow

7
推荐指数
1
解决办法
1万
查看次数

如何通过 pyarrow 使用用户定义的模式编写 Parquet

当我执行以下代码时 - 出现以下错误ValueError: Table schema does not match schema used to create file

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


fields = [
    ('one', pa.int64()),
    ('two', pa.string(), False),
    ('three', pa.bool_())
]
schema = pa.schema(fields)

schema = schema.remove_metadata()
df = pd.DataFrame(
    {
        'one': [2, 2, 2],
        'two': ['foo', 'bar', 'baz'],
        'three': [True, False, True]
    }
)

df['two'] = df['two'].astype(str)

table = pa.Table.from_pandas(df, schema, preserve_index=False).replace_schema_metadata()
writer = pq.ParquetWriter('parquest_user_defined_schema.parquet', schema=schema)
writer.write_table(table)
Run Code Online (Sandbox Code Playgroud)

python-3.x pyarrow

6
推荐指数
1
解决办法
9773
查看次数

使用 Array&lt;Map&lt;String,String&gt;&gt; 列读取 Parquet 文件

我正在使用 Dask 读取 PySpark 生成的 Parquet 文件,其中一列是字典列表(即array<map<string,string>>')。df 的一个例子是:

import pandas as pd

df = pd.DataFrame.from_records([ 
    (1, [{'job_id': 1, 'started': '2019-07-04'}, {'job_id': 2, 'started': '2019-05-04'}], 100), 
    (5, [{'job_id': 3, 'started': '2015-06-04'}, {'job_id': 9, 'started': '2019-02-02'}], 540)], 
    columns=['uid', 'job_history', 'latency'] 
) 
Run Code Online (Sandbox Code Playgroud)

当使用 时engine='fastparquet,Dask 可以很好地读取所有其他列,但会None为具有复杂类型的列返回 s 列。当我设置时engine='pyarrow',出现以下异常:

ArrowNotImplementedError: lists with structs are not supported.
Run Code Online (Sandbox Code Playgroud)

许多谷歌搜索已经明确表明,现在并不真正支持读取带有嵌套数组的列,而且我不完全确定处理此问题的最佳方法是什么。我想我的选择是:

  • 一些如何告诉 dask/fastparquet 使用标准库解析列json。该模式很简单,如果可能的话就可以完成这项工作
  • 看看我是否可以重新运行生成输出的 Spark 作业并将其另存为其他内容,尽管这几乎不是一个可接受的解决方案,因为我的公司到处都使用镶木地板
  • 将映射的键转换为列,并使用 dtype 将数据分解为多个列,list并注意这些列中的数据通过索引相互关联/映射(例如,0这些键/列中的 idx 中的元素全部来自相同来源)。这会起作用,但坦率地说,让我心碎:(

我很想听听其他人如何克服这个限制。我的公司经常在其镶木地板中使用嵌套数组,因此我不想放弃使用 Dask。

python dask python-3.7 fastparquet pyarrow

6
推荐指数
1
解决办法
1万
查看次数

为什么分区 parquet 文件会占用更大的磁盘空间?

我正在学习使用 python 和 pyarrow 的镶木地板文件。Parquet 在压缩和最小化磁盘空间方面非常出色。snappy我的数据集是 190MB csv 文件,当保存为压缩 parquet 文件时,最终会成为单个 3MB 文件。

然而,当我将数据集保存为分区文件时,它们会导致组合大小更大(61MB)。

这是我尝试保存的示例数据集:

listing_id |     date     | gender | price
-------------------------------------------
     a     |  2019-01-01  |   M    |   100
     b     |  2019-01-02  |   M    |   100
     c     |  2019-01-03  |   F    |   200
     d     |  2019-01-04  |   F    |   200

Run Code Online (Sandbox Code Playgroud)

当我按日期(300 多个唯一值)分区时,分区文件的总大小为 61MB。每个文件都有168.2kB大小。当我按性别(2 个唯一值)分区时,分区文件的总大小仅为 3MB。

我想知道镶木地板是否有最小文件大小,这样许多小文件组合起来会消耗更大的磁盘空间?

我的环境:

- OS: Ubuntu 18.04
- Language: Python
- Library: pyarrow, pandas
Run Code Online (Sandbox Code Playgroud)

我的数据集来源:

https://www.kaggle.com/brittabettendorf/berlin-airbnb-data

# I am using calendar_summary.csv …
Run Code Online (Sandbox Code Playgroud)

python parquet pyarrow

6
推荐指数
1
解决办法
2239
查看次数

将 DataFrame 加载到 BigQuery 表时出错(pyarrow.lib.ArrowTypeError:&lt;class 'str'&gt; 类型的对象无法转换为 int)

我在 GCS 中存储了一个 CSV 文件,我想将其加载到 BigQuery 表中。但我需要先进行一些预处理,所以我将其加载到 DataFrame,然后加载到 BigQuery 表

import pandas as pd
import json 
from google.cloud import bigquery


cols_name_list = [....]. # column name in order
uri = "gs://<bucket>/<path>/<csv_file>"
df = pd.read_csv(uri, dtype="string")
df =df.reindex(columns=cols_name_list)

client = bigquery.Client()
job_config = bigquery.LoadJobConfig(
   ... # added all schema field according to table column type
)

job = client.load_table_from_dataframe(
    df, "<bq_table_id>", job_config=job_config
)
job.result()
Run Code Online (Sandbox Code Playgroud)

从上面的代码中,我对数据帧列顺序进行了重新排序,以与 BigQuery 表中的顺序相匹配(不确定这是否重要),并将所有列转换为字符串类型。

我收到此错误,如下所示

pyarrow.lib.ArrowInvalid: Could not convert '47803' with type str: tried to convert to int …
Run Code Online (Sandbox Code Playgroud)

python numpy pandas google-bigquery pyarrow

6
推荐指数
1
解决办法
2万
查看次数