如何使用pySpark子模块中定义的UDF?

ldc*_*ldc 5 python apache-spark pyspark

我想使用在子模块中定义的 PySpark UDF module.foo,我已将其添加到我的SparkContext. 当我尝试时,PySpark 抛出一个ModuleNotFoundErrorfor the main module module

如果我将子模块移出主模块,它会按预期工作,但我更愿意保持结构不变。

任何想法 ?

准确地说,我的代码结构是

project/
|- main.py
|- module/
   |- __init__.py
   |- foo.py
Run Code Online (Sandbox Code Playgroud)

主要.py

project/
|- main.py
|- module/
   |- __init__.py
   |- foo.py
Run Code Online (Sandbox Code Playgroud)

foo.py

import module.foo
spark = SparkSession.builder \
            .appName(appName) \
            .config(conf=sConf) \
            .enableHiveSupport() \
            .getOrCreate()

spark.sparkContext.addPyFile('some_path/project/module/foo.py')

df = module.foo.bar(spark)
Run Code Online (Sandbox Code Playgroud)

我的错误是

from pyspark.sql.types      import StringType
from pyspark.sql.functions  import udf

def hello():
    return "Hello World"

def bar(spark):
    hello_udf = udf(hello, StringType())
    df = (spark.sql('SELECT * FROM pokemons')
               .withColumn('hello', hello_udf()))
    return df.toPandas()

Run Code Online (Sandbox Code Playgroud)

Dou*_*oug 6

定义一个局部函数,如下所示:

from pyspark.sql.types      import StringType
from pyspark.sql.functions  import udf

def bar(spark):
    def hello():
        return "Hello World"

    hello_udf = udf(hello, StringType())
    df = (spark.sql('SELECT * FROM pokemons')
               .withColumn('hello', hello_udf()))
    return df.toPandas()
Run Code Online (Sandbox Code Playgroud)

许多人对这个问题有意见,因为这是一个“偶尔”的错误。

实际发生的情况是,当您bar 从 main.py调用时,您实际上正在运行该函数module.foo.bar

因此,当您尝试将函数注册为 udf 时,您实际上是在相对于入口点hello注册函数 hello 。

这就是为什么如果您将代码复制到其中main.py就会正常运行。

- main.py
-- def hello <-- the absolute path of hello relative to main is 'hello' -> WORKS OK

- foo.py
-- def hello <-- when running from foo.py, abs path is hello -> WORKS OK.
             <-- when running from main.py, abs path is foo.hello -> ModuleNotFoundError.
Run Code Online (Sandbox Code Playgroud)

由于foo.hello未在工作人员上注册,这将导致错误。

当您创建本地函数时,例如:

def foo():
    def tmp(): # <-- This is a local function, so it has no module path, so it works.
        ... 
Run Code Online (Sandbox Code Playgroud)

老实说,这似乎是 pyspark 中的一个错误。