在大型PySpark数据框的每一行中应用函数?

mom*_*ind 5 large-scale pyspark

我有一个大的数据框(约3000万行)。我有一个功能f。要做的事情f是遍历每一行,检查一些逻辑并将输出馈送到字典中。该功能需要逐行执行。

我试过了:

dic = dict() for row in df.rdd.collect(): f(row, dic)

但是我总是遇到错误OOM。我将Docker的内存设置为8GB。

如何有效开展业务?

非常感谢

Pre*_*rem 7

您可以尝试下面的方法并告诉我们它是否适合您吗?

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, MapType

#sample data
df = sc.parallelize([
    ['a', 'b'],
    ['c', 'd'],
    ['e', 'f']
]).toDF(('col1', 'col2'))

#add logic to create dictionary element using rows of the dataframe    
def add_to_dict(l):
    d = {}
    d[l[0]] = l[1]
    return d
add_to_dict_udf = udf(add_to_dict, MapType(StringType(), StringType()))
#struct is used to pass rows of dataframe
df = df.withColumn("dictionary_item", add_to_dict_udf(struct([df[x] for x in df.columns])))
df.show()

#list of dictionary elements
dictionary_list = [i[0] for i in df.select('dictionary_item').collect()]
print dictionary_list
Run Code Online (Sandbox Code Playgroud)

输出是:

[{u'a': u'b'}, {u'c': u'd'}, {u'e': u'f'}]
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助!


Tw *_*Nus 5

通过使用collect您将所有数据从 Spark Executor 中提取到您的驱动程序中。你真的应该避免这种情况,因为它使使用 Spark 变得毫无意义(在这种情况下你可以只使用普通的 python)。

你能做什么: