s1n*_*bad 8 python pyspark python-polars
我想知道如何将 Spark 数据帧转换为 Polars 数据帧。
假设我在 PySpark 上有这段代码:
df = spark.sql('''select * from tmp''')
Run Code Online (Sandbox Code Playgroud)
我可以使用 轻松将其转换为 pandas 数据框.toPandas。极坐标中有类似的东西吗,因为我需要获取极坐标数据帧以进行进一步处理?
rit*_*e46 24
Pyspark 使用箭头转换为 pandas。Polars 是箭头存储器的抽象。因此,我们可以劫持 Spark 内部使用的 API 创建箭头数据并使用它来创建极坐标DataFrame。
给定一个 Spark 上下文,我们可以这样写:
\nimport pyarrow as pa\nimport polars as pl\n\nsql_context = SQLContext(spark)\n\ndata = [(\'James\',[1, 2]),]\nspark_df = sql_context.createDataFrame(data=data, schema = ["name","properties"])\n\ndf = pl.from_arrow(pa.Table.from_batches(spark_df._collect_as_arrow()))\n\nprint(df)\nRun Code Online (Sandbox Code Playgroud)\nshape: (1, 2)\n\xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 name \xe2\x94\x86 properties \xe2\x94\x82\n\xe2\x94\x82 --- \xe2\x94\x86 --- \xe2\x94\x82\n\xe2\x94\x82 str \xe2\x94\x86 list[i64] \xe2\x94\x82\n\xe2\x95\x9e\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xaa\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xa1\n\xe2\x94\x82 James \xe2\x94\x86 [1, 2] \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\nRun Code Online (Sandbox Code Playgroud)\ntoPandas这实际上比它本身提供的要快spark,因为它节省了额外的副本。
toPandas()将导致此序列化/复制步骤:
spark-memory -> arrow-memory -> pandas-memory
根据提供的查询,我们有:
\nspark-memory -> arrow/polars-memory
请注意,Polars 是一个单机多线程 DataFrame 库。相比之下,Spark 是一个多机多线程 DataFrame 库。因此 Spark 将 DataFrame 分布在多台机器上。
如果您的数据集需要此功能,因为 DataFrame 不适合单台机器,那么_collect_as_arrow、to_dict和from_pandas不适合您。
如果您想使用一些 Polars 代码(Spark -> Polars -> Spark)转换 Spark DataFrame,您可以使用以下方法来实现分布式和可扩展性mapInArrow:
import pyarrow as pa
import polars as pl
from typing import Iterator
# The example data as a Spark DataFrame
data = [(1, 1.0), (2, 2.0)]
spark_df = spark.createDataFrame(data=data, schema = ['id', 'value'])
spark_df.show()
# Define your transformation on a Polars DataFrame
# Here we multply the 'value' column by 2
def polars_transform(df: pl.DataFrame) -> pl.DataFrame:
return df.select([
pl.col('id'),
pl.col('value') * 2
])
# Converts a part of the Spark DataFrame into a Polars DataFrame and call `polars_transform` on it
def arrow_transform(iter: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
# Transform a single RecordBatch so data fit into memory
# Increase spark.sql.execution.arrow.maxRecordsPerBatch if batches are too small
for batch in iter:
polars_df = pl.from_arrow(pa.Table.from_batches([batch]))
polars_df_2 = polars_transform(polars_df)
for b in polars_df_2.to_arrow().to_batches():
yield b
# Map the Spark DataFrame to Arrow, then to Polars, run the the `polars_transform` on it,
# and transform everything back to Spark DataFrame, all distributed and scalable
spark_df_2 = spark_df.mapInArrow(arrow_transform, schema='id long, value double')
spark_df_2.show()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8079 次 |
| 最近记录: |