PySpark评估

srj*_*jit 6 python apache-spark pyspark

我正在尝试以下代码,它为RDD中的每一行添加一个数字,并使用PySpark返回一个RDD列表.

from pyspark.context import SparkContext
file  = "file:///home/sree/code/scrap/sample.txt"
sc = SparkContext('local', 'TestApp')
data = sc.textFile(file) 
splits = [data.map(lambda p :  int(p) + i) for i in range(4)]
print splits[0].collect()
print splits[1].collect()
print splits[2].collect()
Run Code Online (Sandbox Code Playgroud)

输入文件(sample.txt)中的内容是:

1
2
3
Run Code Online (Sandbox Code Playgroud)

我期待这样的输出(在rdd中分别添加0,1,2的数字):

[1,2,3]
[2,3,4]
[3,4,5]
Run Code Online (Sandbox Code Playgroud)

而实际产出是:

[4, 5, 6]
[4, 5, 6]
[4, 5, 6]
Run Code Online (Sandbox Code Playgroud)

这意味着理解仅使用变量i的值3,而不考虑范围(4).

为什么会出现这种情况?

zer*_*323 4

发生这种情况是因为 Python 后期绑定,而不是 (Py)Spark 特定的。i将在使用时查找lambda p : int(p) + i,而不是在定义时查找。通常,这意味着它被调用时,但在这个特定的上下文中,它是当它被序列化以发送给工作人员时。

例如,您可以执行以下操作:

def f(i):
    def _f(x):
        try:
            return int(x) + i
        except:
            pass
    return _f

data = sc.parallelize(["1", "2", "3"])
splits = [data.map(f(i)) for i in range(4)]
[rdd.collect() for rdd in splits]
## [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
Run Code Online (Sandbox Code Playgroud)