MJe*_*emy 5 python apache-spark pyspark
我有一项工作需要在分区的 Spark 数据帧上运行,该过程如下所示:
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
Run Code Online (Sandbox Code Playgroud)
结果rdd是pandas.dataframe,
type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame
Run Code Online (Sandbox Code Playgroud)
并rdd.glom().collect()返回结果如下:
[[df1], [df2], ...]
Run Code Online (Sandbox Code Playgroud)
现在我希望将结果转换为spark dataframe,我所做的方式是:
sp = None
for i, partition in enumerate(rdd.collect()):
if i == 0:
sp = spark.createDataFrame(partition)
else:
sp = sp.union(spark.createDataFrame(partition))
return sp
Run Code Online (Sandbox Code Playgroud)
但是,结果可能很大,rdd.collect()可能会超出驱动程序的内存,所以我需要避免collect()操作。有办法解决这个问题吗?
提前致谢!
如果你想继续使用 rdd api。mapPartitions接受一种类型的迭代器并期望另一种类型的迭代器作为结果。pandas_df 不是mapPartitions可以直接处理的迭代器类型。如果你必须使用 pandas api,你可以从创建一个合适的生成器pandas.iterrows
这样,您的总体mapPartitions结果将是行类型的单个 rdd,而不是 pandas 数据帧的 rdd。这样的 rdd 可以通过动态模式发现无缝转换为数据帧
from pyspark.sql import Row
def some_fuction(iter):
pandas_df = some_pandas_result(iter)
for index, row in pandas_df.iterrows():
yield Row(id=index, foo=row['foo'], bar=row['bar'])
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7801 次 |
| 最近记录: |