PySpark从本地函数广播变量

Mag*_*sol 11 python apache-spark pyspark

我正在尝试从Python方法中创建广播变量(尝试抽象我正在创建的依赖于分布式操作的一些实用程序方法).但是,我似乎无法从Spark工作者中访问广播变量.

假设我有这个设置:

def main():
    sc = SparkContext()
    SomeMethod(sc)

def SomeMethod(sc):
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value  ### NameError: global name 'V' is not defined ###
Run Code Online (Sandbox Code Playgroud)

但是,如果我改为消除SomeMethod()中间人,它就可以了.

def main():
    sc = SparkContext()
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value   # works just fine
Run Code Online (Sandbox Code Playgroud)

如果可以的话,我宁愿不必将所有Spark逻辑放在main方法中.有没有办法从本地函数中广播变量并让它们对Spark工作者全局可见?

或者,对于这种情况,什么是一个好的设计模式 - 例如,我想专门为Spark编写一个自包含的方法,并执行我想重用的特定功能?

ely*_*ase 16

我不确定我是否完全理解了这个问题但是,如果你需要Vworker函数中的对象,那么你肯定应该将它作为参数传递,否则该方法实际上不是自包含的:

def worker(V, element):
    element *= V.value
Run Code Online (Sandbox Code Playgroud)

现在为了在map函数中使用它,你需要使用partial,这样map只能看到1参数函数:

from functools import partial

def SomeMethod(sc):
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(partial(worker, V=V))
Run Code Online (Sandbox Code Playgroud)

  • 这样传递广播变量对性能有影响吗?举例来说,我依赖于map()函数中成千上万(或更多)行的广播变量。类似于`def transform(row):return broadcast_variable.value [row [0]]`,然后将其用于`rdd.map(transform)之类的`map()`函数中。 (2认同)
  • 谢谢这个解决方案帮助我避免使用全局广播变量.请注意,您应该替换worker方法参数的顺序,以便首先使用'element'参数(由Spark框架填充).否则它将无法工作. (2认同)