如何将pyspark UDF导入主类

ayp*_*lam 3 python user-defined-functions apache-spark pyspark

我有两个文件。functions.py具有一个函数,并从该函数创建pyspark udf。main.py尝试导入udf。但是,main.py似乎无法访问中的功能functions.py

functions.py:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def do_something(x):
    return x + 'hello'

sample_udf = udf(lambda x: do_something(x), StringType())
Run Code Online (Sandbox Code Playgroud)

main.py:

from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
Run Code Online (Sandbox Code Playgroud)

这会导致错误:

17/10/03 19:35:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, ip-10-223-181-5.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 55, in read_command
    command = serializer._read_with_length(file)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 454, in loads
    return pickle.loads(obj)
AttributeError: 'module' object has no attribute 'do_something'
Run Code Online (Sandbox Code Playgroud)

如果我绕过do_something函数而只是将其放在udf中,例如:udf(lambda x: x + ' hello', StringType()),则UDF可以很好地导入-但我的函数要长一点,最好将其封装在单独的函数中。什么是实现此目标的正确方法?

小智 5

只需将此作为答案:-

将您的py文件添加到sparkcontext中,以使其对执行者可用。

sc.addPyFile("functions.py")
from functions import sample_udf 
Run Code Online (Sandbox Code Playgroud)

这是我的测试笔记本

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3669221609244155/3140647912908320/868274901052987/latest.html

谢谢,查尔斯。


Xav*_*ton 5

我认为一个更干净的解决方案是使用 udf 装饰器来定义你的 udf 函数:

import pyspark.sql.functions as F
from pyspark.sql.types import StringType

@F.udf
def sample_udf(x):
     return x + 'hello'
Run Code Online (Sandbox Code Playgroud)

使用此解决方案,udf 不会引用任何其他函数,并且您不需要sc.addPyFile在主代码中使用 。

from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
# It works :)
Run Code Online (Sandbox Code Playgroud)

对于某些较旧版本的 Spark,装饰器不支持类型化 udf,有些您可能必须定义自定义装饰器,如下所示:

import pyspark.sql.functions as F
import pyspark.sql.types as t

# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
    def _typed_udf_wrapper(func):
        return F.udf(func, returntype)
    return _typed_udf_wrapper

@udf_typed(t.IntegerType())
def my_udf(x)
    return int(x)
Run Code Online (Sandbox Code Playgroud)