在 for 循环内将映射函数附加到 PySpark RDD

Mar*_*kNS 5 apache-spark pyspark

有人可以帮我理解在 python for 循环中将映射函数附加到 RDD 的行为吗?

对于以下代码:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(3):
    rdd = rdd.map(lambda x: appender(x, i))

rdd.collect()
Run Code Online (Sandbox Code Playgroud)

我得到输出:

[[1, 2, 2, 2], [2, 2, 2, 2], [3, 2, 2, 2]]
Run Code Online (Sandbox Code Playgroud)

而使用以下代码:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()
Run Code Online (Sandbox Code Playgroud)

我得到预期的输出:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]
Run Code Online (Sandbox Code Playgroud)

我想这与传递给 PySpark 编译器的闭包有关,但我找不到任何关于此的文档......

Kon*_*ewa 1

我最好的猜测是因为懒惰的评估:而且你的范围也很糟糕。

这两个代码片段产生相同的输出:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(1,4):
    rdd = spark.sparkContext.parallelize(rdd.map(lambda x: appender(x, i)).collect())

rdd.collect()
Run Code Online (Sandbox Code Playgroud)

输出:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]
Run Code Online (Sandbox Code Playgroud)

第二个:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()
Run Code Online (Sandbox Code Playgroud)

输出:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]
Run Code Online (Sandbox Code Playgroud)

另外,为了显示简化示例中 for 循环中发生的情况(仅输入 1 和 2 ),并使用修改后的附加函数来打印 l 参数:

  1. for 循环打印:

    [2]
    [2, 2]
    [1]
    [3]
    [1, 2]
    [3, 2]
    
    Run Code Online (Sandbox Code Playgroud)

首先它从输入列表中获取第二个字段

  1. 映射器输出的显式写法是:

    [1]
    [1, 1]
    [2]
    [2, 1]
    [3]
    [3, 1]
    
    Run Code Online (Sandbox Code Playgroud)