标签: pyarrow

Pyarrow 在使用 Pandas to_parquet() 时应用模式

我有一个非常宽的数据框(20,000 列),主要由 Pandas 中的 float64 列组成。我想将这些列转换为 float32 并写入 Parquet 格式。我这样做是因为这些文件的下游用户是内存有限的小容器。

我目前在 Pandas 中投射,但这在广泛的数据集上非常慢,然后写出镶木地板。是否可以在写入 to_parquet 过程本身时转换类型?下面显示了一个虚拟示例。

import pandas as pd
import numpy as np
import pyarrow
df = pd.DataFrame(np.random.randn(3000, 15000)) # make dummy data set
df.columns = [str(x) for x in list(df)] # make column names string for parquet
df[list(df.loc[:, df.dtypes == float])] = df[list(df.loc[:, df.dtypes == float])].astype('float32') # cast the data
df.to_parquet("myfile.parquet") # write out the df
Run Code Online (Sandbox Code Playgroud)

python pandas pyarrow

5
推荐指数
2
解决办法
1467
查看次数

pyarrow 的内存泄漏?

对于较大文件的解析,我需要循环写入大量的parquet文件。但是,此任务消耗的内存似乎在每次迭代中都会增加,而我希望它保持不变(因为不应在内存中附加任何内容)。这使得扩展变得棘手。

我添加了一个最小的可重现示例,它创建了 10 000 个镶木地板和循环附加到它。

import resource
import random
import string
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd


def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

schema = pa.schema([
                        pa.field('test', pa.string()),
                    ])

resource.setrlimit(resource.RLIMIT_NOFILE, (1000000, 1000000))
number_files = 10000
number_rows_increment = 1000
number_iterations = 100

writers = [pq.ParquetWriter('test_'+id_generator()+'.parquet', schema) for i in range(number_files)]

for i in range(number_iterations):
    for writer in writers:
        table_to_write = pa.Table.from_pandas(
                            pd.DataFrame({'test': [id_generator() for i in range(number_rows_increment)]}),
                            preserve_index=False,
                            schema = …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet pyarrow

5
推荐指数
1
解决办法
1948
查看次数

如何在 macOS Mojave 上使用 Pandas UDF?(由于 [__NSPlaceholderDictionary initialize] 可能已在进行中而失败...)

我正在尝试在 macOS 10.14.3 (macOS Mojave) 上的 Apache Spark 2.4.0 中使用Pandas UDF(又名矢量化 UDF)。

我安装pandaspyarrow使用pip(以及后来pip3)。

每当我执行Spark SQL官方文档中的示例代码时,都会出现以下异常。

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
Run Code Online (Sandbox Code Playgroud)

例外情况如下:

objc[97883]: +[__NSPlaceholderDictionary initialize] may have been in …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark pyspark-sql pyarrow

5
推荐指数
1
解决办法
449
查看次数

使用 Python 将大数据流写入 Parquet

我想用 Python 将大数据流写入镶木地板文件。我的数据很大,我无法将它们保存在内存中并一口气写入它们。

我找到了两个可以在 Parquet 文件上读写的 Python 库(Pyarrow、Fastparquet)。这是我使用 Pyarrow 的解决方案,但如果您知道一个可行的解决方案,我很乐意尝试另一个库:

import pandas as pd
import random
import pyarrow as pa
import pyarrow.parquet as pq


def data_generator():
    # This is a simulation for my generator function
    # It is not allowed to change the nature of this function
    options = ['op1', 'op2', 'op3', 'op4']
    while True:
        dd = {'c1': random.randint(1, 10), 'c2': random.choice(options)}
        yield dd


result_file_address = 'example.parquet'
index = 0

try:
    dic_data = next(data_generator())
    df = pd.DataFrame(dic_data, [index])
    table = …
Run Code Online (Sandbox Code Playgroud)

python streaming bigdata parquet pyarrow

5
推荐指数
0
解决办法
1289
查看次数

使用数据构建表。避免创建数据框

Pandas 数据框很重,所以我想避免这种情况。但我想构造 Pyarrow Table 以便以镶木地板格式存储数据。

我搜索并阅读了文档,并尝试使用 from_array() 但它不起作用。

field=[pa.field('name',pa.string()),pa.field('age',pa.int64())]
arrays=[pa.array(['Tom']),pa.array([23])]
pa.Table.from_arrays(pa.schema(field),arrays)
Run Code Online (Sandbox Code Playgroud)

错误是:名称长度 (1) 与数组长度 (2) 不匹配

pyarrow

5
推荐指数
1
解决办法
1901
查看次数

Pytest 嘲讽补丁 - 如何排除故障?

我在使用模拟补丁时遇到了我认为的常见问题,因为我无法找出正确的补丁。

我有两个问题希望得到帮助。

  1. 关于如何解决以下示例中的特定问题的想法
  2. 并且可能是最重要的关于如何最好地解决“我该修补哪一件事”问题的专业提示/指针/想法/建议。我遇到的问题是,在没有完全了解修补工作原理的情况下,我真的不知道我应该寻找什么并发现自己在玩猜谜游戏。

使用pyarrow它的一个例子目前让我感到痛苦:

我的模块.py

import pyarrow

class HdfsSearch:
    def __init__(self):
        self.fs = self._connect()

    def _connect(self) -> object:
        return pyarrow.hdfs.connect(driver="libhdfs")

    def search(self, path: str):
        return self.fs.ls(path=path)
Run Code Online (Sandbox Code Playgroud)

测试模块.py

import pyarrow
import pytest

from mymodule import HdfsSearch

@pytest.fixture()
def hdfs_connection_fixture(mocker):
    mocker.patch("pyarrow.hdfs.connect")
    yield HdfsSearch()

def test_hdfs_connection(hdfs_connection_fixture):
    pyarrow.hdfs.connect.assert_called_once() # <-- succeeds

def test_hdfs_search(hdfs_connection_fixture):
    hdfs_connection_fixture.search(".")
    pyarrow.hdfs.HadoopFileSystem.ls.assert_called_once() # <-- fails
Run Code Online (Sandbox Code Playgroud)

pytest 输出:

$ python -m pytest --verbose test_module.py
=========================================================================================================== test session starts ============================================================================================================
platform linux -- Python 3.7.4, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 …
Run Code Online (Sandbox Code Playgroud)

python mocking pytest pyarrow

5
推荐指数
1
解决办法
5367
查看次数

PySpark 2.4.5:使用 PandasUDF 时的 IllegalArgumentException

我正在尝试 Pandas UDF 并面临 IllegalArgumentException。我还尝试从 PySpark 文档GroupedData复制示例进行检查,但仍然出现错误。

以下是环境配置

  • 蟒蛇3.7
  • 使用 pip 安装 PySpark==2.4.5
  • 使用 pip 安装 PyArrow==0.16.0
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('int', PandasUDFType.GROUPED_AGG)  
def min_udf(v):
    return v.min()

sorted(gdf.agg(min_udf(df.age)).collect())  
Run Code Online (Sandbox Code Playgroud)

输出

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-66-94a0a39bfe30> in <module>
----> 1 sorted(gdf.agg(min_udf(sample_data.sqft)).collect())

~/Desktop/test/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
    532         """
    533         with SCCallSiteSync(self._sc) as css:
--> 534             sock_info = self._jdf.collectToPython()
    535         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    536 

~/Desktop/test/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, …
Run Code Online (Sandbox Code Playgroud)

python pandas apache-spark pyspark pyarrow

5
推荐指数
1
解决办法
771
查看次数

自 apache arrow 1.0.1 发布以来用于长期存储的 Feather 格式

由于在Feather Github 中搜索问题,以及 stackoverflow 中的问题,例如Feather和 parquet 之间有什么区别?,由于Apache Arrow版本为 0.xx,不推荐 Feather 格式作为长期存储,并且由于不断发布新版本而被认为是不稳定的。

我的问题是,从当前的 Apache Arrow 版本 1.0.1 开始,这种情况是否有所改变?Feather 被认为可以稳定地用作长期存储吗?

python dataframe pandas feather pyarrow

5
推荐指数
1
解决办法
471
查看次数

如何修复镶木地板文件的python熊猫中的时间戳解释

我有一些带有时间戳的 spark(scala) 数据帧/表,它们来自我们的 DHW,有时会使用一些高水印。

我想在 python 中使用 Pandas 处理这些数据,所以我将它们写为 spark 中的镶木地板文件,然后用 Pandas 再次读取。问题是 pandas/pyarrow 无法处理时间戳。这些转换为dateTime64[ns],女巫可以容纳的日期范围有限。所以一些时间戳(尤其是所有高水位标记)会得到错误的条目。

我如何强制熊猫将时间戳解释dateTime[mu]为例如。或者将高(和低)水印设置为 NAN 而不是仅使用错误的转换值?

这是一个最小的代码示例

火花:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._


val df_spark = Seq(
  (1, "1050-01-01 23:00:01"), 
  (2, "2014-11-30 12:40:32"), 
  (3, "2016-12-29 09:54:00"), 
  (4, "2500-01-01 00:00:00")).toDF("id", "date-string")
  .withColumn("valid_to", to_timestamp(col("date-string")))

df_spark.repartition(1).write.parquet("timestamptest")
df_spark.show
+---+-------------------+-------------------+
| id|        date-string|           valid_to|
+---+-------------------+-------------------+
|  1|1050-01-01 23:00:01|1050-01-01 23:00:01|
|  2|2014-11-30 12:40:32|2014-11-30 12:40:32|
|  3|2016-12-29 09:54:00|2016-12-29 09:54:00|
|  4|2500-01-01 00:00:00|2500-01-01 00:00:00|
+---+-------------------+-------------------+

Run Code Online (Sandbox Code Playgroud)

在 Python 中读取: …

python timestamp pandas parquet pyarrow

5
推荐指数
0
解决办法
754
查看次数

ArrowTypeError: 没有通过 numpy.dtype 对象','类型为 int32 的 X 列的转换失败

问题

我正在尝试将数据框保存为 Databricks 上的镶木地板文件,得到 ArrowTypeError。

Databricks 运行时版本:7.6 ML(包括 Apache Spark 3.0.1、Scala 2.12)

日志跟踪

ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column inv_yr with type int32')
Run Code Online (Sandbox Code Playgroud)

python numpy pandas databricks pyarrow

5
推荐指数
1
解决办法
2104
查看次数