Vai*_*nas 8 user-defined-functions apache-spark-sql pyspark
情况如下:
我们有一个模块,我们定义一些返回的函数pyspark.sql.DataFrame(DF).为了获得这些DF,我们使用pyspark.sql.functions.udf在同一文件或辅助模块中定义的DF .当我们实际为pyspark编写作业以执行时,我们只从模块导入函数(我们提供.zip文件--py-files),然后将数据帧保存到hdfs.
问题在于,当我们这样做时,该udf功能会冻结我们的工作.我们发现的令人讨厌的修复是udf在作业中定义函数并将它们提供给我们模块中的导入函数.我在这里找到的另一个修复是定义一个类:
from pyspark.sql.functions import udf
class Udf(object):
def __init__(s, func, spark_type):
s.func, s.spark_type = func, spark_type
def __call__(s, *args):
return udf(s.func, s.spark_type)(*args)
Run Code Online (Sandbox Code Playgroud)
然后使用它来定义我udf的模块.这有效!
任何人都可以解释为什么我们首先遇到这个问题?为什么这个修复(具有类定义的最后一个)有效?
附加信息:PySpark 2.1.0.在群集模式下在纱线上部署作业.
谢谢!
您上面发布的链接的公认答案是:“我的解决办法是避免创建 UDF,直到 Spark 运行,因此存在活动的 SparkContext。” 看来您的问题与序列化 UDF 有关。
确保辅助类中的 UDF 函数是静态方法或全局函数。在您在其他地方导入的公共函数内,您可以定义 udf。
class Helperclass(object):
@staticmethod
def my_udf_todo(...):
...
def public_function_that_is_imported_elsewhere(...):
todo_udf = udf(Helperclass.my_udf_todo, RETURN_SCHEMA)
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
507 次 |
| 最近记录: |