相关疑难解决方法(0)

访问Spark RDD时在闭包中使用局部变量

我有一个关于在访问Spark RDD时在闭包中使用局部变量的问题.我想解决的问题如下:

我有一个应该读入RDD的文本文件列表.但是,首先我需要向从单个文本文件创建的RDD添加其他信息.从文件名中提取此附加信息.然后,使用union()将RDD放入一个大的RDD中.

from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)

list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
    tmp_rdd = spark_context.textFile(filename)
    # extract_file_info('file_from_Owner.txt') == 'Owner'
    file_owner = extract_file_info(filename)   
    tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
    rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work: 
# The result is that always Bert will be the owner, i.e., never Ernie.
Run Code Online (Sandbox Code Playgroud)

问题是循环中的map()函数没有引用"正确的"file_owner.相反,它将引用file_owner的最新值.在我的本地机器上,我设法通过为每个RDD调用cache()函数来解决问题:

# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner)) …
Run Code Online (Sandbox Code Playgroud)

closures apache-spark rdd pyspark

6
推荐指数
1
解决办法
4757
查看次数

标签 统计

apache-spark ×1

closures ×1

pyspark ×1

rdd ×1