Pyspark将多个csv文件读入数据帧(或RDD?)

fly*_*all 3 python apache-spark pyspark spark-dataframe jupyter-notebook

我有一个Spark 2.0.2集群,我通过Puppark通过Jupyter Notebook进行攻击.我有多个管道分隔的txt文件(加载到HDFS.但也可在本地目录中使用),我需要使用spark-csv加载到三个独立的数据帧中,具体取决于文件的名称.

我看到我可以采取的三种方法 - 要么我可以使用python以某种方式遍历HDFS目录(尚未弄清楚如何执行此操作,加载每个文件然后执行联合.

我也知道在spark中存在一些通配符功能(见这里) - 我可以利用它

最后,我可以使用pandas从磁盘加载vanilla csv文件作为pandas数据帧,然后创建一个spark数据帧.这里的缺点是这些文件很大,并且在单个节点上加载到内存中可能需要大约8GB.(这就是为什么它首先转移到集群).

这是我到目前为止的代码和两个方法的一些伪代码:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
    pd_df = pd.read_csv(currFile, sep = '|')
    df = spark.createDataFrame(pd_df)
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df
Run Code Online (Sandbox Code Playgroud)

有谁知道如何实现方法1或2?我无法弄清楚这些.另外,我很惊讶没有更好的方法将csv文件加载到pyspark数据框中 - 使用第三方软件包看起来应该是本机功能让我感到困惑(我是否只是错过了标准用例)用于将csv文件加载到数据帧中?)最后,我将把一个统一的单个数据帧写回HDFS(使用.write.parquet()),这样我就可以清除内存并使用MLlib进行一些分析.如果我强调的方法不是最佳实践,我会很感激推动正确的方向!

Ram*_*mzy 9

方法1:

在python中你不能直接引用HDFS位置.你需要得到像pydoop这样的另一个图书馆的帮助.在scala和java中,你有API.即使使用pydoop,您也将逐个阅读文件.一个接一个地读取文件并且不使用spark提供的并行读取选项是不好的.

方法2:

您应该可以使用逗号分隔或使用通配符指向多个文件.这样spark就会负责读取文件并将它们分配到分区中.但是,如果你对每个数据框使用union选项,那么当你动态读取每个文件时会有一个边缘情况.当您拥有大量文件时,列表可能会在驱动程序级别变得如此巨大并且可能导致内存问题.主要原因是,读取过程仍然发生在驱动程序级别.

这个选项更好.spark将读取与regex相关的所有文件并将其转换为分区.你得到一个RDD用于所有的通配符匹配,从那里你不需要担心个别rdd的联合

示例代码cnippet:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")
Run Code Online (Sandbox Code Playgroud)

方法3:

除非你在使用pandas功能的python中有一些遗留应用程序,否则我更愿意使用spark提供的API