ran*_*ith 5 python broadcast user-defined-functions apache-spark pyspark
我在单独的 py 文件中设置了广播变量,然后将其导入到包含 UDF 的文件中。但是当我尝试在 UDF 中使用此变量时,我发现广播变量在某些转换函数的范围内使用时未初始化(NoneType)Dataframe。这是支持代码。
广播模型的utils.py定义如下,
class Broadcaster(object):
_map = {}
_bv = None
@staticmethod
def set_item(k, v):
Broadcaster._map[k] = v
@staticmethod
def broadcast(sc):
Broadcaster._bv = sc.broadcast(Broadcaster._map)
@staticmethod
def get_item(k):
val = Broadcaster._bv.value
return val.get(k)
Run Code Online (Sandbox Code Playgroud)
这样做的原因是提供一个可以在广播之前设置多个 k,v 组合的接口。这意味着,在我的 中main.py,我现在可以Broadcaster.set_item(k, v)多次调用,然后最终调用Broadcaster.broadcast(sc),效果很好。但现在,我想在 UDF 中使用这个广播变量,它位于一个单独的文件中(例如udfs.py)。请注意,这些 UDF 在我的处理中使用Dataframe。下面是一个示例 UDF,
def my_udf(col):
bv = Broadcaster._bv.value #this throws exception :-(
#more code
Run Code Online (Sandbox Code Playgroud)
在我的udfs.py文件中,访问Broadcaster._bv.value. 只是当在 udf 中使用并且从内部调用此 udf 时Dataframe,我得到的结果NoneType没有value异常。基本上工作节点无法访问广播变量。为什么不能在跨文件中使用广播变量?我见过一些例子,人们在存在广播变量的同一文件中定义 udf 并且它似乎工作正常。但由于代码量很大,我需要将它们放在单独的文件中。我有什么选择?
编辑:我不想序列化该对象,在调用期间将其传递给 UDF 并在 UDF 内反序列化。我认为这违背了广播变量的目的。
| 归档时间: |
|
| 查看次数: |
515 次 |
| 最近记录: |