Moh*_*OUI 6 python apache-spark rdd pyspark
我有一些zip文件,有数百个zip文件:
Parent_1.zip: [child1.zip, ..., childM.zip]
Parent_..zip: [child1.zip, ..., childN.zip]
Parent_P.zip: [child1.zip, ..., childL.zip]
Run Code Online (Sandbox Code Playgroud)
我通过将二进制文件加载到rdd中解压缩它们,然后使用flatmap应用一个函数来提取csv文件作为字符串foreach child.zip块.
zips = sc.binaryFiles(data_files)
files_data = zips.flatMap(zip_extract_stores)
Run Code Online (Sandbox Code Playgroud)
一旦提取了所有的zip块,我总共获得了748个块.
print(files_data.count()) # => 748 chunks
Run Code Online (Sandbox Code Playgroud)
现在,我正在尝试将files_datardd中的csv字符串转换为单个数据帧.但我面临的问题很少.
这是我尝试过的.
collect()在内存中提取名单如下:files_data = zips.flatMap(zip_extract_stores).collect()dataframes = [get_dataframe(data) for data in files_data]merge_list_of_dataframes(dataframes)这在local模式下运行时工作正常,但cluster由于内存问题导致模式失败.
files_data = zips.flatMap(zip_extract_stores)我尝试files_data使用rdd迭代toLocalIterator()
for idx, data in enumerate(files_data.toLocalIterator()):
if idx % 100 == 0:
print("Loaded {} dataframes".format(idx))
dataframes.append(get_dataframe(data))
Run Code Online (Sandbox Code Playgroud)然后将数据框合并为一个 merge_list_of_dataframes(dataframes)
这在local模式下工作正常,但在cluster模式下,最终数据帧的总计数从一次运行到另一次运行是不同的.例如,一次运行总计数为100万,而另一次运行总计数为500万,尽管总预期计数为800万.
files_data.toLocalIterator()所以它消耗所有数据?| 归档时间: |
|
| 查看次数: |
689 次 |
| 最近记录: |