Pyspark rdd.toLocalIterator不会遍历所有数据分区

Moh*_*OUI 6 python apache-spark rdd pyspark

我有一些zip文件,有数百个zip文件:

 Parent_1.zip: [child1.zip, ..., childM.zip]
 Parent_..zip: [child1.zip, ..., childN.zip]
 Parent_P.zip: [child1.zip, ..., childL.zip]
Run Code Online (Sandbox Code Playgroud)

我通过将二进制文件加载到rdd中解压缩它们,然后使用flatmap应用一个函数来提取csv文件作为字符串foreach child.zip块.

zips = sc.binaryFiles(data_files)
files_data = zips.flatMap(zip_extract_stores)
Run Code Online (Sandbox Code Playgroud)

一旦提取了所有的zip块,我总共获得了748个块.

print(files_data.count()) #  => 748 chunks 
Run Code Online (Sandbox Code Playgroud)

现在,我正在尝试将files_datardd中的csv字符串转换为单个数据帧.但我面临的问题很少.

这是我尝试过的.

尝试1:

  • 应用collect()在内存中提取名单如下:files_data = zips.flatMap(zip_extract_stores).collect()
  • 然后将块转换为数据帧列表 dataframes = [get_dataframe(data) for data in files_data]
  • 然后将数据帧合并为一个数据帧: merge_list_of_dataframes(dataframes)

这在local模式下运行时工作正常,但cluster由于内存问题导致模式失败.

尝试2:

  • 给那个, files_data = zips.flatMap(zip_extract_stores)
  • 我尝试files_data使用rdd迭代toLocalIterator()

    for idx, data in enumerate(files_data.toLocalIterator()):
        if idx % 100 == 0:
            print("Loaded {} dataframes".format(idx))
        dataframes.append(get_dataframe(data))
    
    Run Code Online (Sandbox Code Playgroud)
  • 然后将数据框合并为一个 merge_list_of_dataframes(dataframes)

这在local模式下工作正常,但在cluster模式下,最终数据帧的总计数从一次运行到另一次运行是不同的.例如,一次运行总计数为100万,而另一次运行总计数为500万,尽管总预期计数为800万.

问题:

  • 如何稳定files_data.toLocalIterator()所以它消耗所有数据?
  • 否则,是否有另一种方法可以实现我想要做的事情,即将一串CSV字符串转换为单个数据帧?