pySpark将mapPartitions的结果转换为spark DataFrame

MJe*_*emy 5 python apache-spark pyspark

我有一项工作需要在分区的 Spark 数据帧上运行,该过程如下所示:

rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
Run Code Online (Sandbox Code Playgroud)

结果rddpandas.dataframe,

type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame
Run Code Online (Sandbox Code Playgroud)

rdd.glom().collect()返回结果如下:

[[df1], [df2], ...]
Run Code Online (Sandbox Code Playgroud)

现在我希望将结果转换为spark dataframe,我所做的方式是:

sp = None
for i, partition in enumerate(rdd.collect()):
    if i == 0:
        sp = spark.createDataFrame(partition)
    else:
        sp = sp.union(spark.createDataFrame(partition))

return sp

Run Code Online (Sandbox Code Playgroud)

但是,结果可能很大,rdd.collect()可能会超出驱动程序的内存,所以我需要避免collect()操作。有办法解决这个问题吗?

提前致谢!

dre*_*-hh 3

如果你想继续使用 rdd api。mapPartitions接受一种类型的迭代器并期望另一种类型的迭代器作为结果。pandas_df 不是mapPartitions可以直接处理的迭代器类型。如果你必须使用 pandas api,你可以从创建一个合适的生成器pandas.iterrows

这样,您的总体mapPartitions结果将是行类型的单个 rdd,而不是 pandas 数据帧的 rdd。这样的 rdd 可以通过动态模式发现无缝转换为数据帧

from pyspark.sql import Row

def some_fuction(iter):
  pandas_df = some_pandas_result(iter)
  for index, row in pandas_df.iterrows():
     yield Row(id=index, foo=row['foo'], bar=row['bar'])


rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)
Run Code Online (Sandbox Code Playgroud)

  • 感谢您花时间深入研究这个问题。我按照您的建议解决了这个问题:转换为“Row”,然后转换为“createDataFrame”。我应用的代码是将“pandas数据框”的每一行附加到“Row”对象列表中:“row_list.append(Row(**row_dict))” (2认同)