如何将 Spark 数据帧转换为 Polars 数据帧?

s1n*_*bad 8 python pyspark python-polars

我想知道如何将 Spark 数据帧转换为 Polars 数据帧。

假设我在 PySpark 上有这段代码:

df = spark.sql('''select * from tmp''')
Run Code Online (Sandbox Code Playgroud)

我可以使用 轻松将其转换为 pandas 数据框.toPandas。极坐标中有类似的东西吗,因为我需要获取极坐标数据帧以进行进一步处理?

rit*_*e46 24

语境

\n

Pyspark 使用箭头转换为 pandas。Polars 是箭头存储器的抽象。因此,我们可以劫持 Spark 内部使用的 API 创建箭头数据并使用它来创建极坐标DataFrame

\n

总长DR

\n

给定一个 Spark 上下文,我们可以这样写:

\n
import pyarrow as pa\nimport polars as pl\n\nsql_context = SQLContext(spark)\n\ndata = [(\'James\',[1, 2]),]\nspark_df = sql_context.createDataFrame(data=data, schema = ["name","properties"])\n\ndf = pl.from_arrow(pa.Table.from_batches(spark_df._collect_as_arrow()))\n\nprint(df)\n
Run Code Online (Sandbox Code Playgroud)\n
shape: (1, 2)\n\xe2\x94\x8c\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xac\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 name  \xe2\x94\x86 properties \xe2\x94\x82\n\xe2\x94\x82 ---   \xe2\x94\x86 ---        \xe2\x94\x82\n\xe2\x94\x82 str   \xe2\x94\x86 list[i64]  \xe2\x94\x82\n\xe2\x95\x9e\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xaa\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\x90\xe2\x95\xa1\n\xe2\x94\x82 James \xe2\x94\x86 [1, 2]     \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\n
Run Code Online (Sandbox Code Playgroud)\n

序列化步骤

\n

toPandas这实际上比它本身提供的要快spark,因为它节省了额外的副本。

\n

toPandas()将导致此序列化/复制步骤:

\n

spark-memory -> arrow-memory -> pandas-memory

\n

根据提供的查询,我们有:

\n

spark-memory -> arrow/polars-memory

\n


Enr*_*coM 7

Polars 不是分布式的,而 Spark 是分布式的

请注意,Polars 是一个单机多线程 DataFrame 库。相比之下,Spark 是一个多机多线程 DataFrame 库。因此 Spark 将 DataFrame 分布在多台机器上。

使用可扩展的 Polars 代码转换 Spark DataFrame

如果您的数据集需要此功能,因为 DataFrame 不适合单台机器,那么_collect_as_arrowto_dictfrom_pandas不适合您。

如果您想使用一些 Polars 代码(Spark -> Polars -> Spark)转换 Spark DataFrame,您可以使用以下方法来实现分布式和可扩展性mapInArrow

import pyarrow as pa
import polars as pl

from typing import Iterator


# The example data as a Spark DataFrame
data = [(1, 1.0), (2, 2.0)]
spark_df = spark.createDataFrame(data=data, schema = ['id', 'value'])
spark_df.show()


# Define your transformation on a Polars DataFrame
# Here we multply the 'value' column by 2
def polars_transform(df: pl.DataFrame) -> pl.DataFrame:
  return df.select([
    pl.col('id'),
    pl.col('value') * 2
  ])


# Converts a part of the Spark DataFrame into a Polars DataFrame and call `polars_transform` on it
def arrow_transform(iter: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
  # Transform a single RecordBatch so data fit into memory
  # Increase spark.sql.execution.arrow.maxRecordsPerBatch if batches are too small
  for batch in iter:
    polars_df = pl.from_arrow(pa.Table.from_batches([batch]))
    polars_df_2 = polars_transform(polars_df)
    for b in polars_df_2.to_arrow().to_batches():
      yield b


# Map the Spark DataFrame to Arrow, then to Polars, run the the `polars_transform` on it,
# and transform everything back to Spark DataFrame, all distributed and scalable
spark_df_2 = spark_df.mapInArrow(arrow_transform, schema='id long, value double')
spark_df_2.show()
Run Code Online (Sandbox Code Playgroud)