fly*_*all 3 python apache-spark pyspark spark-dataframe jupyter-notebook
我有一个Spark 2.0.2集群,我通过Puppark通过Jupyter Notebook进行攻击.我有多个管道分隔的txt文件(加载到HDFS.但也可在本地目录中使用),我需要使用spark-csv加载到三个独立的数据帧中,具体取决于文件的名称.
我看到我可以采取的三种方法 - 要么我可以使用python以某种方式遍历HDFS目录(尚未弄清楚如何执行此操作,加载每个文件然后执行联合.
我也知道在spark中存在一些通配符功能(见这里) - 我可以利用它
最后,我可以使用pandas从磁盘加载vanilla csv文件作为pandas数据帧,然后创建一个spark数据帧.这里的缺点是这些文件很大,并且在单个节点上加载到内存中可能需要大约8GB.(这就是为什么它首先转移到集群).
这是我到目前为止的代码和两个方法的一些伪代码:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')
spark = SparkSession(sc)
#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df
#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')
#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
pd_df = pd.read_csv(currFile, sep = '|')
df = spark.createDataFrame(pd_df)
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df
Run Code Online (Sandbox Code Playgroud)
有谁知道如何实现方法1或2?我无法弄清楚这些.另外,我很惊讶没有更好的方法将csv文件加载到pyspark数据框中 - 使用第三方软件包看起来应该是本机功能让我感到困惑(我是否只是错过了标准用例)用于将csv文件加载到数据帧中?)最后,我将把一个统一的单个数据帧写回HDFS(使用.write.parquet()),这样我就可以清除内存并使用MLlib进行一些分析.如果我强调的方法不是最佳实践,我会很感激推动正确的方向!
方法1:
在python中你不能直接引用HDFS位置.你需要得到像pydoop这样的另一个图书馆的帮助.在scala和java中,你有API.即使使用pydoop,您也将逐个阅读文件.一个接一个地读取文件并且不使用spark提供的并行读取选项是不好的.
方法2:
您应该可以使用逗号分隔或使用通配符指向多个文件.这样spark就会负责读取文件并将它们分配到分区中.但是,如果你对每个数据框使用union选项,那么当你动态读取每个文件时会有一个边缘情况.当您拥有大量文件时,列表可能会在驱动程序级别变得如此巨大并且可能导致内存问题.主要原因是,读取过程仍然发生在驱动程序级别.
这个选项更好.spark将读取与regex相关的所有文件并将其转换为分区.你得到一个RDD用于所有的通配符匹配,从那里你不需要担心个别rdd的联合
示例代码cnippet:
distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")
Run Code Online (Sandbox Code Playgroud)
方法3:
除非你在使用pandas功能的python中有一些遗留应用程序,否则我更愿意使用spark提供的API
| 归档时间: |
|
| 查看次数: |
11283 次 |
| 最近记录: |