使用 Python 的 reduce() 加入多个 PySpark DataFrames

Eri*_*ith 5 python python-3.x pyspark spark-dataframe pyspark-sql

有谁知道为什么functools.reduce()在加入多个 PySpark DataFrames 时使用 Python3会导致比使用for循环迭代加入相同的 DataFrames 时更差的性能?具体来说,这会导致大量减速,然后出现内存不足错误:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)
Run Code Online (Sandbox Code Playgroud)

而这个没有:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)
Run Code Online (Sandbox Code Playgroud)

任何想法将不胜感激。谢谢!

Ale*_*lex 1

原因之一是reduce或fold通常在功能上是纯粹的:每个累加操作的结果不会写入内存的同一部分,而是写入新的内存块。

原则上,垃圾收集器可以在每次累加后释放前一个块,但如果不这样做,您将为累加器的每个更新版本分配内存。