我有一个非常宽的数据框(20,000 列),主要由 Pandas 中的 float64 列组成。我想将这些列转换为 float32 并写入 Parquet 格式。我这样做是因为这些文件的下游用户是内存有限的小容器。
我目前在 Pandas 中投射,但这在广泛的数据集上非常慢,然后写出镶木地板。是否可以在写入 to_parquet 过程本身时转换类型?下面显示了一个虚拟示例。
import pandas as pd
import numpy as np
import pyarrow
df = pd.DataFrame(np.random.randn(3000, 15000)) # make dummy data set
df.columns = [str(x) for x in list(df)] # make column names string for parquet
df[list(df.loc[:, df.dtypes == float])] = df[list(df.loc[:, df.dtypes == float])].astype('float32') # cast the data
df.to_parquet("myfile.parquet") # write out the df
Run Code Online (Sandbox Code Playgroud) 对于较大文件的解析,我需要循环写入大量的parquet文件。但是,此任务消耗的内存似乎在每次迭代中都会增加,而我希望它保持不变(因为不应在内存中附加任何内容)。这使得扩展变得棘手。
我添加了一个最小的可重现示例,它创建了 10 000 个镶木地板和循环附加到它。
import resource
import random
import string
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
schema = pa.schema([
pa.field('test', pa.string()),
])
resource.setrlimit(resource.RLIMIT_NOFILE, (1000000, 1000000))
number_files = 10000
number_rows_increment = 1000
number_iterations = 100
writers = [pq.ParquetWriter('test_'+id_generator()+'.parquet', schema) for i in range(number_files)]
for i in range(number_iterations):
for writer in writers:
table_to_write = pa.Table.from_pandas(
pd.DataFrame({'test': [id_generator() for i in range(number_rows_increment)]}),
preserve_index=False,
schema = …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 macOS 10.14.3 (macOS Mojave) 上的 Apache Spark 2.4.0 中使用Pandas UDF(又名矢量化 UDF)。
我安装pandas并pyarrow使用pip(以及后来pip3)。
每当我执行Spark SQL官方文档中的示例代码时,都会出现以下异常。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
def multiply_func(a, b):
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
Run Code Online (Sandbox Code Playgroud)
例外情况如下:
objc[97883]: +[__NSPlaceholderDictionary initialize] may have been in …Run Code Online (Sandbox Code Playgroud) 我想用 Python 将大数据流写入镶木地板文件。我的数据很大,我无法将它们保存在内存中并一口气写入它们。
我找到了两个可以在 Parquet 文件上读写的 Python 库(Pyarrow、Fastparquet)。这是我使用 Pyarrow 的解决方案,但如果您知道一个可行的解决方案,我很乐意尝试另一个库:
import pandas as pd
import random
import pyarrow as pa
import pyarrow.parquet as pq
def data_generator():
# This is a simulation for my generator function
# It is not allowed to change the nature of this function
options = ['op1', 'op2', 'op3', 'op4']
while True:
dd = {'c1': random.randint(1, 10), 'c2': random.choice(options)}
yield dd
result_file_address = 'example.parquet'
index = 0
try:
dic_data = next(data_generator())
df = pd.DataFrame(dic_data, [index])
table = …Run Code Online (Sandbox Code Playgroud) Pandas 数据框很重,所以我想避免这种情况。但我想构造 Pyarrow Table 以便以镶木地板格式存储数据。
我搜索并阅读了文档,并尝试使用 from_array() 但它不起作用。
field=[pa.field('name',pa.string()),pa.field('age',pa.int64())]
arrays=[pa.array(['Tom']),pa.array([23])]
pa.Table.from_arrays(pa.schema(field),arrays)
Run Code Online (Sandbox Code Playgroud)
错误是:名称长度 (1) 与数组长度 (2) 不匹配
我在使用模拟补丁时遇到了我认为的常见问题,因为我无法找出正确的补丁。
我有两个问题希望得到帮助。
使用pyarrow它的一个例子目前让我感到痛苦:
import pyarrow
class HdfsSearch:
def __init__(self):
self.fs = self._connect()
def _connect(self) -> object:
return pyarrow.hdfs.connect(driver="libhdfs")
def search(self, path: str):
return self.fs.ls(path=path)
Run Code Online (Sandbox Code Playgroud)
import pyarrow
import pytest
from mymodule import HdfsSearch
@pytest.fixture()
def hdfs_connection_fixture(mocker):
mocker.patch("pyarrow.hdfs.connect")
yield HdfsSearch()
def test_hdfs_connection(hdfs_connection_fixture):
pyarrow.hdfs.connect.assert_called_once() # <-- succeeds
def test_hdfs_search(hdfs_connection_fixture):
hdfs_connection_fixture.search(".")
pyarrow.hdfs.HadoopFileSystem.ls.assert_called_once() # <-- fails
Run Code Online (Sandbox Code Playgroud)
$ python -m pytest --verbose test_module.py
=========================================================================================================== test session starts ============================================================================================================
platform linux -- Python 3.7.4, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 …Run Code Online (Sandbox Code Playgroud) 我正在尝试 Pandas UDF 并面临 IllegalArgumentException。我还尝试从 PySpark 文档GroupedData复制示例进行检查,但仍然出现错误。
以下是环境配置
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def min_udf(v):
return v.min()
sorted(gdf.agg(min_udf(df.age)).collect())
Run Code Online (Sandbox Code Playgroud)
输出
Py4JJavaError Traceback (most recent call last)
<ipython-input-66-94a0a39bfe30> in <module>
----> 1 sorted(gdf.agg(min_udf(sample_data.sqft)).collect())
~/Desktop/test/venv/lib/python3.7/site-packages/pyspark/sql/dataframe.py in collect(self)
532 """
533 with SCCallSiteSync(self._sc) as css:
--> 534 sock_info = self._jdf.collectToPython()
535 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
536
~/Desktop/test/venv/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, …Run Code Online (Sandbox Code Playgroud) 由于在Feather Github 中搜索问题,以及 stackoverflow 中的问题,例如Feather和 parquet 之间有什么区别?,由于Apache Arrow版本为 0.xx,不推荐 Feather 格式作为长期存储,并且由于不断发布新版本而被认为是不稳定的。
我的问题是,从当前的 Apache Arrow 版本 1.0.1 开始,这种情况是否有所改变?Feather 被认为可以稳定地用作长期存储吗?
我有一些带有时间戳的 spark(scala) 数据帧/表,它们来自我们的 DHW,有时会使用一些高水印。
我想在 python 中使用 Pandas 处理这些数据,所以我将它们写为 spark 中的镶木地板文件,然后用 Pandas 再次读取。问题是 pandas/pyarrow 无法处理时间戳。这些转换为dateTime64[ns],女巫可以容纳的日期范围有限。所以一些时间戳(尤其是所有高水位标记)会得到错误的条目。
我如何强制熊猫将时间戳解释dateTime[mu]为例如。或者将高(和低)水印设置为 NAN 而不是仅使用错误的转换值?
这是一个最小的代码示例:
火花:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val df_spark = Seq(
(1, "1050-01-01 23:00:01"),
(2, "2014-11-30 12:40:32"),
(3, "2016-12-29 09:54:00"),
(4, "2500-01-01 00:00:00")).toDF("id", "date-string")
.withColumn("valid_to", to_timestamp(col("date-string")))
df_spark.repartition(1).write.parquet("timestamptest")
df_spark.show
+---+-------------------+-------------------+
| id| date-string| valid_to|
+---+-------------------+-------------------+
| 1|1050-01-01 23:00:01|1050-01-01 23:00:01|
| 2|2014-11-30 12:40:32|2014-11-30 12:40:32|
| 3|2016-12-29 09:54:00|2016-12-29 09:54:00|
| 4|2500-01-01 00:00:00|2500-01-01 00:00:00|
+---+-------------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
在 Python 中读取时: …
我正在尝试将数据框保存为 Databricks 上的镶木地板文件,得到 ArrowTypeError。
Databricks 运行时版本:7.6 ML(包括 Apache Spark 3.0.1、Scala 2.12)
ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column inv_yr with type int32')
Run Code Online (Sandbox Code Playgroud)