Wae*_*eCo 2 python apache-spark pyspark
我有一个包含我的数据文件的文件夹。每个文件的大小约为1 GB。
我需要的是RDD中的文件名。
以下无法正常工作:
import glob
rdds = []
for filename in glob.iglob('/data/*'):
rdd = sc.textFile(filename).map(lambda row: (filename, row))
rdds.append(rdd)
allData = sc.union(rdds)
Run Code Online (Sandbox Code Playgroud)
使用它,filename始终是最后读取文件的文件名
我还尝试了什么:
import glob
rdds = []
for filename in glob.iglob('/data/*'):
def f(name=filename):
return name
rdd = sc.textFile(filename).map(lambda row: (f(), row))
rdds.append(rdd)
allData = sc.union(rdds)
Run Code Online (Sandbox Code Playgroud)
但这会产生错误: Broadcast can only be serialized in driver
sc.wholeTextFile() 不能选择,因为单个文件太大。
我想你想要这样的东西:
import functools
def proc(f):
return sc.textFile(f).map(lambda x: (f, x))
rdd = functools.reduce(
lambda rdd1, rdd2: rdd1.union(rdd2),
(proc(f) for f in glob.glob("/data/*")))
Run Code Online (Sandbox Code Playgroud)
或搭配sc.union:
sc.union([proc(f) for f in glob.glob("/data/*")])
Run Code Online (Sandbox Code Playgroud)