使用内部定义的udf模块冻结pyspark工作 - 解释?

Vai*_*nas 8 user-defined-functions apache-spark-sql pyspark

情况如下:

我们有一个模块,我们定义一些返回的函数pyspark.sql.DataFrame(DF).为了获得这些DF,我们使用pyspark.sql.functions.udf在同一文件或辅助模块中定义的DF .当我们实际为pyspark编写作业以执行时,我们只从模块导入函数(我们提供.zip文件--py-files),然后将数据帧保存到hdfs.

问题在于,当我们这样做时,该udf功能会冻结我们的工作.我们发现的令人讨厌的修复是udf在作业中定义函数并将它们提供给我们模块中的导入函数.我在这里找到的另一个修复是定义一个类:

from pyspark.sql.functions import udf


class Udf(object):
    def __init__(s, func, spark_type):
        s.func, s.spark_type = func, spark_type

    def __call__(s, *args):
        return udf(s.func, s.spark_type)(*args)
Run Code Online (Sandbox Code Playgroud)

然后使用它来定义我udf的模块.这有效!

任何人都可以解释为什么我们首先遇到这个问题?为什么这个修复(具有类定义的最后一个)有效?

附加信息:PySpark 2.1.0.在群集模式下在纱线上部署作业.

谢谢!

gre*_*nie 2

您上面发布的链接的公认答案是:“我的解决办法是避免创建 UDF,直到 Spark 运行,因此存在活动的 SparkContext。” 看来您的问题与序列化 UDF 有关。

确保辅助类中的 UDF 函数是静态方法或全局函数。在您在其他地方导入的公共函数内,您可以定义 udf。

class Helperclass(object):
  @staticmethod
  def my_udf_todo(...):
     ...

  def public_function_that_is_imported_elsewhere(...):
     todo_udf = udf(Helperclass.my_udf_todo, RETURN_SCHEMA)
     ...
Run Code Online (Sandbox Code Playgroud)