标签: pyarrow

pyarrow 错误:toPandas 尝试进行 Arrow 优化

当我将 pyarrow 设置为 true 时,我们使用 Spark 会话,但是当我运行 toPandas() 时,它会抛出错误:

"toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to disable this"
Run Code Online (Sandbox Code Playgroud)

我可以知道为什么会这样吗?

pyspark pyarrow

12
推荐指数
1
解决办法
4万
查看次数

保存pd.DataFrame时如何强制镶木地板dtypes?

有没有办法强制镶木地板文件将pd.DataFrame列编码为给定类型,即使该列的所有值都为空?镶木地板在其模式中自动分配"null"的事实阻止我将许多文件加载到单个文件中dask.dataframe.

试图使用pandas列投射df.column_name = df.column_name.astype(sometype)不起作用.

我为什么这么问

我想将许多镶木地板文件加载到一个单独的dask.dataframe.所有文件都是pd.DataFrame使用多个实例生成的df.to_parquet(filename).所有数据帧都具有相同的列,但对于某些列,给定列可能只包含空值.当试图将所有文件加载到dask.dataframe(使用时df = dd.read_parquet('*.parquet'),我得到以下错误:

Schema in filename.parquet was different.
id: int64
text: string
[...]
some_column: double

vs

id: int64
text: string
[...]
some_column: null
Run Code Online (Sandbox Code Playgroud)

重现我的问题的步骤

import pandas as pd
import dask.dataframe as dd
a = pd.DataFrame(['1', '1'], columns=('value',))
b = pd.DataFrame([None, None], columns=('value',))
a.to_parquet('a.parquet')
b.to_parquet('b.parquet')
df = dd.read_parquet('*.parquet')  # Reads a and b
Run Code Online (Sandbox Code Playgroud)

这给了我以下内容:

ValueError: Schema in path/to/b.parquet was different. …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet dask pyarrow

11
推荐指数
1
解决办法
2517
查看次数

无法在 OSX/Python 3.9 上安装 pyarrow:这是我还是不兼容的包?

我正在尝试pyarrow在 OSX 11.0.1 上使用 pip3进行安装,并收到错误消息。

我正在使用 Python 3.9,但不确定这是否是问题所在。

这是错误摘要:

  ERROR: Command errored out with exit status 1:
   command: /usr/local/opt/python@3.9/bin/python3.9 /usr/local/lib/python3.9/site-packages/pip install --ignore-installed --no-user --prefix /private/var/folders/mk/2fgx_1s96zjd1r9xzhs2ht_00000gn/T/pip-build-env-pev1z3i2/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i https://pypi.org/simple -- 'cython >= 0.29' 'numpy==1.14.5; python_version<'"'"'3.7'"'"'' 'numpy==1.16.0; python_version>='"'"'3.7'"'"'' setuptools setuptools_scm wheel
       cwd: None
Run Code Online (Sandbox Code Playgroud)

这是完整的(很长)错误输出,我不知道如何阅读它以找到问题的根源:https : //pastebin.com/RQcEuwbz

看起来pyarrow可能在 Python 3.9 上有问题,但现在应该已经修复了这些问题?我对接下来要做什么感到有些困惑。

更新:啊,找到了一个 PR 使 pyarrow 与尚未合并的Python 3.9 一起工作- 所以我猜 pyarrow 的 pip 版本不适用于 Python 3.9,这就是问题所在。看起来 PR …

python pyarrow

11
推荐指数
2
解决办法
4395
查看次数

以箭头格式编写 numpy 数组的最快方法

我正在寻找numpy使用 来存储和检索数组的快速方法pyarrow。我对检索非常满意。.arrow从我的文件中提取包含 1.000.000.000 个整数的列只需不到 1 秒的时间dtype = np.uint16

\n
import pyarrow as pa\nimport numpy as np\n\ndef write(arr, name):\n    arrays = [pa.array(col) for col in arr]\n    names = [str(i) for i in range(len(arrays))]\n    batch = pa.RecordBatch.from_arrays(arrays, names=names)\n    with pa.OSFile(name, 'wb') as sink:\n        with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:\n            writer.write_batch(batch)\n\ndef read(name):\n    source = pa.memory_map(name, 'r')\n    table = pa.ipc.RecordBatchStreamReader(source).read_all()\n    for i in range(table.num_columns):\n        yield table.column(str(i)).to_numpy()\n\narr = np.random.randint(65535, size=(250, 4000000), dtype=np.uint16)\n\n%%timeit -r 1 -n 1\nwrite(arr, 'test.arrow')\n>>> 25.6 …
Run Code Online (Sandbox Code Playgroud)

python numpy pyarrow

11
推荐指数
1
解决办法
3244
查看次数

使用pandas_udf和Parquet序列化时发生内存泄漏?

我目前正在使用PySpark开发我的第一个整个系统,并且遇到了一些奇怪的与内存相关的问题。在其中一个阶段中,我想类似于“拆分应用合并”策略以修改DataFrame。也就是说,我想对给定列定义的每个组应用一个函数,最后将它们全部合并。问题是,我要应用的函数是一种针对拟合模型的预测方法,该模型“说出”了熊猫的成语,即将其矢量化并以熊猫系列作为输入。

然后,我设计了一种迭代策略,遍历各个组并手动应用pandas_udf.Scalar来解决问题。组合部分使用对DataFrame.unionByName()的增量调用完成。我决定不使用pandas_udf的GroupedMap类型,因为文档指出该内存应由用户管理,并且只要其中一个组太大而无法保存在内存中或由一组表示,则应格外小心熊猫DataFrame。

主要问题是所有处理似乎都可以正常运行,但最后我想将最终的DataFrame序列化为Parquet文件。在这一点上,我收到了许多关于DataFrameWriter的类似Java的错误或内存不足异常。

我已经在Windows和Linux机器上尝试过该代码。我设法避免错误的唯一方法是增加机器中的--driver-memory值。最小值在每个平台上都不同,并且取决于问题的大小,这使我怀疑内存泄漏。

直到我开始使用pandas_udf时,问题才发生。我认为在使用pandas_udf进行的pyarrow序列化的整个过程中,可能在某处内存泄漏。

我创建了一个最小的可复制示例。如果我直接使用Python运行此脚本,则会产生错误。使用提交火花并增加大量驱动程序内存,可以使其正常工作。

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp


# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
    return x + 100.0


# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
        "mre").master("local[3]").getOrCreate()

sc = spark.sparkContext

# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"

z = 105
m = 750000

schema = spktyp.StructType(
    [spktyp.StructField("ID", spktyp.DoubleType(), True)]
)

df = spark.createDataFrame(
    [(float(i),) for i in range(m)],
    schema
)

for j in range(z):
    df = df.withColumn(
        f"N{j}",
        F.col("ID") + float(j)
    ) …
Run Code Online (Sandbox Code Playgroud)

python pandas pyspark pyspark-sql pyarrow

10
推荐指数
1
解决办法
372
查看次数

使用谓词从 pyarrow.parquet.ParquetDataset 过滤行

我有一个存储在 s3 上的镶木地板数据集,我想从数据集中查询特定行。我能够使用petastorm它来做到这一点,但现在我只想使用pyarrow.

这是我的尝试:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()
Run Code Online (Sandbox Code Playgroud)

但这会返回一个pandas DataFrame,就好像过滤器不起作用一样,即我有具有不同值的行event_name。有什么我遗漏的或我误解的吗?我可以在获得 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存空间。

python amazon-s3 pandas parquet pyarrow

10
推荐指数
4
解决办法
1万
查看次数

以内存高效的方式从 python 中的流创建 Parquet 文件

在 Python 中创建 Parquet 文件的最常见方法似乎是首先创建 Pandas 数据框,然后使用 pyarrow 将表写入 Parquet。我担心这可能会导致内存使用量过大 - 因为它需要至少将数据集的一份完整副本存储在内存中才能创建 pandas 数据帧。

我想知道是否由于列压缩要求而需要将整个数据集加载到内存中,或者是否有更高效且基于流的方法。就我而言,我将以流媒体方式接收记录。对于类似的 csv 输出过程,我们以 1000 为批量将行写入磁盘,因此需要在内存中保存的行数永远不会达到完整数据集的大小。

我是不是该...?:

  1. 只需创建一个 pandas 数据框,然后将其写入镶木地板。(这意味着整个数据集需要存储在内存中,但我们将此视为必要要求。)
  2. 使用一些流友好的方式在我们收到它们时一次写入 1000 行左右,从而最大限度地减少整个过程中总的时间点 ram 消耗。(我没有看到任何有关如何执行此操作的文档,而且我不确定它是否是镶木地板的选项。)
  3. 将所有内容写入 CSV,然后使用智能读取/分析 CSV 内容并在事后创建压缩镶木地板的函数。(运行时间可能较慢,但内存配置文件较低,并且在非常大的文件上失败的机会较低。)

想法?建议?

python parquet fastparquet pyarrow

10
推荐指数
1
解决办法
5740
查看次数

在相同环境下使用 CLI 与可执行文件从 parquet 读取 DataFrame 时的不同行为

请考虑以下程序作为最小可重现示例 -MRE

import pandas as pd
import pyarrow
from pyarrow import parquet

def foo():
    print(pyarrow.__file__)
    print('version:',pyarrow.cpp_version)
    print('-----------------------------------------------------')
    df = pd.DataFrame({'A': [1,2,3], 'B':['dummy']*3})
    print('Orignal DataFrame:\n', df)
    print('-----------------------------------------------------')
    _table = pyarrow.Table.from_pandas(df)
    parquet.write_table(_table, 'foo')
    _table = parquet.read_table('foo', columns=[])    #passing empty list to columns arg
    df = _table.to_pandas()
    print('After reading from file with columns=[]:\n', df)
    print('-----------------------------------------------------')
    print('Not passing [] to columns parameter')
    _table = parquet.read_table('foo')                #Not passing any list
    df = _table.to_pandas()
    print(df)
    print('-----------------------------------------------------')
    x = input('press any key to exit: ') …
Run Code Online (Sandbox Code Playgroud)

python pyinstaller pandas parquet pyarrow

10
推荐指数
1
解决办法
420
查看次数

如何有效地将大型数据帧拆分为许多镶木地板文件?

考虑以下数据框

import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa

idx = pd.date_range('2017-01-01 12:00:00.000', '2017-03-01 12:00:00.000', freq = 'T')

dataframe = pd.DataFrame({'numeric_col' : np.random.rand(len(idx)),
                          'string_col' : pd.util.testing.rands_array(8,len(idx))},
                           index = idx)

dataframe
Out[30]: 
                     numeric_col string_col
2017-01-01 12:00:00       0.4069   wWw62tq6
2017-01-01 12:01:00       0.2050   SleB4f6K
2017-01-01 12:02:00       0.5180   cXBvEXdh
2017-01-01 12:03:00       0.3069   r9kYsJQC
2017-01-01 12:04:00       0.3571   F2JjUGgO
2017-01-01 12:05:00       0.3170   8FPC4Pgz
2017-01-01 12:06:00       0.9454   ybeNnZGV
2017-01-01 12:07:00       0.3353   zSLtYPWF
2017-01-01 12:08:00       0.8510   tDZJrdMM
2017-01-01 12:09:00       0.4948 …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet pyarrow

9
推荐指数
1
解决办法
4474
查看次数

导入 pyarrow 不起作用 &lt;- 错误是“ValueError:未安装 pyarrow 库,请安装 pyarrow 以使用 to_arrow() 函数。”

我尝试在终端和 juypter 实验室中安装它,它说它已成功安装,但是当我运行 df = query_job.to_dataframe() 时,我不断收到错误“ ValueError: The pyarrow library is not installed, please install pyarrow使用 to_arrow() 函数。”。我不知道如何解决这个问题。有什么建议吗?我试图最终使用代码从谷歌数据工作室访问数据,

from google.cloud import bigquery
import pandas
import numpy
import pyarrow
bigquery_client = bigquery.Client()
import os 
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] ='full file path here.json'
QUERY = """
SELECT * 
FROM `warehouse`
LIMIT 100
"""
query_job = bigquery_client.query(QUERY)
df = query_job.to_dataframe()
Run Code Online (Sandbox Code Playgroud)

google-bigquery jupyter pyarrow

9
推荐指数
2
解决办法
6530
查看次数