当我将 pyarrow 设置为 true 时,我们使用 Spark 会话,但是当我运行 toPandas() 时,它会抛出错误:
"toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to disable this"
Run Code Online (Sandbox Code Playgroud)
我可以知道为什么会这样吗?
有没有办法强制镶木地板文件将pd.DataFrame列编码为给定类型,即使该列的所有值都为空?镶木地板在其模式中自动分配"null"的事实阻止我将许多文件加载到单个文件中dask.dataframe.
试图使用pandas列投射df.column_name = df.column_name.astype(sometype)不起作用.
我为什么这么问
我想将许多镶木地板文件加载到一个单独的dask.dataframe.所有文件都是pd.DataFrame使用多个实例生成的df.to_parquet(filename).所有数据帧都具有相同的列,但对于某些列,给定列可能只包含空值.当试图将所有文件加载到dask.dataframe(使用时df = dd.read_parquet('*.parquet'),我得到以下错误:
Schema in filename.parquet was different.
id: int64
text: string
[...]
some_column: double
vs
id: int64
text: string
[...]
some_column: null
Run Code Online (Sandbox Code Playgroud)
重现我的问题的步骤
import pandas as pd
import dask.dataframe as dd
a = pd.DataFrame(['1', '1'], columns=('value',))
b = pd.DataFrame([None, None], columns=('value',))
a.to_parquet('a.parquet')
b.to_parquet('b.parquet')
df = dd.read_parquet('*.parquet') # Reads a and b
Run Code Online (Sandbox Code Playgroud)
这给了我以下内容:
ValueError: Schema in path/to/b.parquet was different. …Run Code Online (Sandbox Code Playgroud) 我正在尝试pyarrow在 OSX 11.0.1 上使用 pip3进行安装,并收到错误消息。
我正在使用 Python 3.9,但不确定这是否是问题所在。
这是错误摘要:
ERROR: Command errored out with exit status 1:
command: /usr/local/opt/python@3.9/bin/python3.9 /usr/local/lib/python3.9/site-packages/pip install --ignore-installed --no-user --prefix /private/var/folders/mk/2fgx_1s96zjd1r9xzhs2ht_00000gn/T/pip-build-env-pev1z3i2/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i https://pypi.org/simple -- 'cython >= 0.29' 'numpy==1.14.5; python_version<'"'"'3.7'"'"'' 'numpy==1.16.0; python_version>='"'"'3.7'"'"'' setuptools setuptools_scm wheel
cwd: None
Run Code Online (Sandbox Code Playgroud)
这是完整的(很长)错误输出,我不知道如何阅读它以找到问题的根源:https : //pastebin.com/RQcEuwbz
看起来pyarrow可能在 Python 3.9 上有问题,但现在应该已经修复了这些问题?我对接下来要做什么感到有些困惑。
更新:啊,找到了一个 PR 使 pyarrow 与尚未合并的Python 3.9 一起工作- 所以我猜 pyarrow 的 pip 版本不适用于 Python 3.9,这就是问题所在。看起来 PR …
我正在寻找numpy使用 来存储和检索数组的快速方法pyarrow。我对检索非常满意。.arrow从我的文件中提取包含 1.000.000.000 个整数的列只需不到 1 秒的时间dtype = np.uint16。
import pyarrow as pa\nimport numpy as np\n\ndef write(arr, name):\n arrays = [pa.array(col) for col in arr]\n names = [str(i) for i in range(len(arrays))]\n batch = pa.RecordBatch.from_arrays(arrays, names=names)\n with pa.OSFile(name, 'wb') as sink:\n with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:\n writer.write_batch(batch)\n\ndef read(name):\n source = pa.memory_map(name, 'r')\n table = pa.ipc.RecordBatchStreamReader(source).read_all()\n for i in range(table.num_columns):\n yield table.column(str(i)).to_numpy()\n\narr = np.random.randint(65535, size=(250, 4000000), dtype=np.uint16)\n\n%%timeit -r 1 -n 1\nwrite(arr, 'test.arrow')\n>>> 25.6 …Run Code Online (Sandbox Code Playgroud) 我目前正在使用PySpark开发我的第一个整个系统,并且遇到了一些奇怪的与内存相关的问题。在其中一个阶段中,我想类似于“拆分应用合并”策略以修改DataFrame。也就是说,我想对给定列定义的每个组应用一个函数,最后将它们全部合并。问题是,我要应用的函数是一种针对拟合模型的预测方法,该模型“说出”了熊猫的成语,即将其矢量化并以熊猫系列作为输入。
然后,我设计了一种迭代策略,遍历各个组并手动应用pandas_udf.Scalar来解决问题。组合部分使用对DataFrame.unionByName()的增量调用完成。我决定不使用pandas_udf的GroupedMap类型,因为文档指出该内存应由用户管理,并且只要其中一个组太大而无法保存在内存中或由一组表示,则应格外小心熊猫DataFrame。
主要问题是所有处理似乎都可以正常运行,但最后我想将最终的DataFrame序列化为Parquet文件。在这一点上,我收到了许多关于DataFrameWriter的类似Java的错误或内存不足异常。
我已经在Windows和Linux机器上尝试过该代码。我设法避免错误的唯一方法是增加机器中的--driver-memory值。最小值在每个平台上都不同,并且取决于问题的大小,这使我怀疑内存泄漏。
直到我开始使用pandas_udf时,问题才发生。我认为在使用pandas_udf进行的pyarrow序列化的整个过程中,可能在某处内存泄漏。
我创建了一个最小的可复制示例。如果我直接使用Python运行此脚本,则会产生错误。使用提交火花并增加大量驱动程序内存,可以使其正常工作。
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp
# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
return x + 100.0
# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
"mre").master("local[3]").getOrCreate()
sc = spark.sparkContext
# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"
z = 105
m = 750000
schema = spktyp.StructType(
[spktyp.StructField("ID", spktyp.DoubleType(), True)]
)
df = spark.createDataFrame(
[(float(i),) for i in range(m)],
schema
)
for j in range(z):
df = df.withColumn(
f"N{j}",
F.col("ID") + float(j)
) …Run Code Online (Sandbox Code Playgroud) 我有一个存储在 s3 上的镶木地板数据集,我想从数据集中查询特定行。我能够使用petastorm它来做到这一点,但现在我只想使用pyarrow.
这是我的尝试:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
'analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.read_pandas().to_pandas()
Run Code Online (Sandbox Code Playgroud)
但这会返回一个pandas DataFrame,就好像过滤器不起作用一样,即我有具有不同值的行event_name。有什么我遗漏的或我误解的吗?我可以在获得 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存空间。
在 Python 中创建 Parquet 文件的最常见方法似乎是首先创建 Pandas 数据框,然后使用 pyarrow 将表写入 Parquet。我担心这可能会导致内存使用量过大 - 因为它需要至少将数据集的一份完整副本存储在内存中才能创建 pandas 数据帧。
我想知道是否由于列压缩要求而需要将整个数据集加载到内存中,或者是否有更高效且基于流的方法。就我而言,我将以流媒体方式接收记录。对于类似的 csv 输出过程,我们以 1000 为批量将行写入磁盘,因此需要在内存中保存的行数永远不会达到完整数据集的大小。
我是不是该...?:
想法?建议?
请考虑以下程序作为最小可重现示例 -MRE:
import pandas as pd
import pyarrow
from pyarrow import parquet
def foo():
print(pyarrow.__file__)
print('version:',pyarrow.cpp_version)
print('-----------------------------------------------------')
df = pd.DataFrame({'A': [1,2,3], 'B':['dummy']*3})
print('Orignal DataFrame:\n', df)
print('-----------------------------------------------------')
_table = pyarrow.Table.from_pandas(df)
parquet.write_table(_table, 'foo')
_table = parquet.read_table('foo', columns=[]) #passing empty list to columns arg
df = _table.to_pandas()
print('After reading from file with columns=[]:\n', df)
print('-----------------------------------------------------')
print('Not passing [] to columns parameter')
_table = parquet.read_table('foo') #Not passing any list
df = _table.to_pandas()
print(df)
print('-----------------------------------------------------')
x = input('press any key to exit: ') …Run Code Online (Sandbox Code Playgroud) 考虑以下数据框
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa
idx = pd.date_range('2017-01-01 12:00:00.000', '2017-03-01 12:00:00.000', freq = 'T')
dataframe = pd.DataFrame({'numeric_col' : np.random.rand(len(idx)),
'string_col' : pd.util.testing.rands_array(8,len(idx))},
index = idx)
dataframe
Out[30]:
numeric_col string_col
2017-01-01 12:00:00 0.4069 wWw62tq6
2017-01-01 12:01:00 0.2050 SleB4f6K
2017-01-01 12:02:00 0.5180 cXBvEXdh
2017-01-01 12:03:00 0.3069 r9kYsJQC
2017-01-01 12:04:00 0.3571 F2JjUGgO
2017-01-01 12:05:00 0.3170 8FPC4Pgz
2017-01-01 12:06:00 0.9454 ybeNnZGV
2017-01-01 12:07:00 0.3353 zSLtYPWF
2017-01-01 12:08:00 0.8510 tDZJrdMM
2017-01-01 12:09:00 0.4948 …Run Code Online (Sandbox Code Playgroud) 我尝试在终端和 juypter 实验室中安装它,它说它已成功安装,但是当我运行 df = query_job.to_dataframe() 时,我不断收到错误“ ValueError: The pyarrow library is not installed, please install pyarrow使用 to_arrow() 函数。”。我不知道如何解决这个问题。有什么建议吗?我试图最终使用代码从谷歌数据工作室访问数据,
from google.cloud import bigquery
import pandas
import numpy
import pyarrow
bigquery_client = bigquery.Client()
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] ='full file path here.json'
QUERY = """
SELECT *
FROM `warehouse`
LIMIT 100
"""
query_job = bigquery_client.query(QUERY)
df = query_job.to_dataframe()
Run Code Online (Sandbox Code Playgroud) pyarrow ×10
python ×8
pandas ×5
parquet ×5
pyspark ×2
amazon-s3 ×1
dask ×1
fastparquet ×1
jupyter ×1
numpy ×1
pyinstaller ×1
pyspark-sql ×1