UDF中的Pyspark和局部变量

hol*_*est 5 python user-defined-functions apache-spark pyspark

当我定义局部变量(例如大量复杂对象)并在pyspark的UDF中使用它时,会发生什么。让我以此为例:

huge_list = [<object_1>, <object_2>, ..., <object_n>]

@udf
def some_function(a, b):
    l = []
    for obj in huge_list:
        l.append(a.operation(obj))
    return l

df2 = df.withColumn('foo', some_function(col('a'), col('b')))
Run Code Online (Sandbox Code Playgroud)

它会自动播放吗?还是节点与主机进行通信以每次获取其数据?我使用这种方法会有哪些性能损失?有更好的吗?(考虑到huge_list每次应用UDF都从头开始构建会更糟)

Ale*_*rov 2

查看代码,可以看到发生以下情况:每个 udf调用一次此函数,该函数通过腌制CloudPickleSerializer函数中的可调用项。它还具有将 pickled callable 的大小与 1Mb 的硬编码阈值进行比较的逻辑。如果大小较大,则广播 pickled 命令,并pyspark.broadcast.Broadcast改为 pickle 类型的对象(其序列化值显然非常短,因为该对象几乎是一个引用)。读取 pickled callable 的地方似乎就在这里。我的理解是,执行者为每个新任务的执行从头开始创建一个Python进程。对于每个使用的 udf,它将获得 pickled 命令并取消它,或者(对于广播)需要从 JVM 获取广播的值并取消它。

据我了解,如果pyspark.broadcast.Broadcast此处创建了对象,则所有执行程序都将保留该执行程序将创建的 python work.py 进程的所有未来查找的值。

因此,如果你想回答某个函数是否会被广播的问题,你可以重复 pyspark 所做的相同操作,看看 pickled 对象是否大于 1Mb,例如:

from pyspark.serializers import CloudPickleSerializer
ser = CloudPickleSerializer()
x = [i**2 for i in range(10**5)]
v = ser.dumps(lambda : x)
print(len(v)) # 607434 - less than 1Mb, won't be broadcast
Run Code Online (Sandbox Code Playgroud)

关于替代方法,我认为我看到的唯一替代方法(除了每次调用 udf'ed 函数时创建新对象,这已经被解释为太昂贵)是创建一个模块,该模块将在导入期间创建有问题的对象。在这种情况下,该对象将为每个任务执行创建一次。因此,这几乎为您提供了一个选择:(a) 如果CloudPickleSerializer您只允许 udf 函数捕获对象,则每次任务执行时反序列化对象一次;或者 (b) 通过导入模块在每次任务执行时创建对象一次。更快的是一个单独的问题 - 但我想答案可能取决于所讨论的对象。在每种情况下,它似乎都相当容易测量。