将多个 S3 文件夹/路径读入 PySpark

lse*_*ary 3 python amazon-s3 pyspark jupyter-notebook

我正在使用 PySpark 进行大数据分析。我可以使用以下命令导入存储在特定存储桶的特定文件夹中的所有 CSV 文件:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file:///home/path/datafolder/data2014/*.csv')
Run Code Online (Sandbox Code Playgroud)

(其中 * 就像通配符一样)

我遇到的问题如下:

  1. 如果我想对 2014 年和 2015 年的数据进行分析,即文件 1 是.load('file:///home/path/SFweather/data2014/*.csv'),文件 2 是.load('file:///home/path/SFweather/data2015/*.csv'),文件 3 是.load('file:///home/path/NYCweather/data2014/*.csv'),文件 4 是.load('file:///home/path/NYCweather/data2015/*.csv')。如何同时导入多个路径以获取一个数据框?我是否需要将它们全部单独存储为数据帧,然后在 PySpark 中将它们连接在一起?(您可以假设它们所有 CSV 都具有相同的架构)
  2. 假设现在是 2014 年 11 月。如果我想再次运行分析,但在 2014 年 12 月时运行“最新数据”,例如 dec14,该怎么办?例如,我想.load('file:///home/path/datafolder/data2014/dec14/*.csv')在 12 月 14 日加载文件 2:并使用此文件:.load('file:///home/path/datafolder/data2014/nov14/*.csv')进行原始分析。有没有办法安排 Jupyter 笔记本(或类似的)来更新加载路径并导入最新的运行(在这种情况下,'nov14' 将被替换为 'dec14' 然后是 'jan15' 等)。

我查看了之前的问题,但无法找到答案,因为这是特定于 AWS / PySpark 集成的。

预先感谢您的帮助!

[背景:我已经获得了来自不同团队的许多包含各种大数据集的 S3 存储桶的访问权限。将其复制到我的 S3 存储桶,然后构建 Jupyter 笔记本似乎比直接从存储桶中提取数据并在其上构建模型/表/等并将处理后的输出保存到数据库中的工作要多得多。因此,我发布了上述问题。如果我的想法完全错误,请阻止我!:)]

Bob*_*ain 7

只要文件都采用相同的格式,您就可以使用通配符读取多个路径。

在你的例子中:

.load('file:///home/path/SFweather/data2014/*.csv')
.load('file:///home/path/SFweather/data2015/*.csv')
.load('file:///home/path/NYCweather/data2014/*.csv')
.load('file:///home/path/NYCweather/data2015/*.csv')
Run Code Online (Sandbox Code Playgroud)

您可以将上面的 4 个加载语句替换为以下路径,以一次将所有 csv 读取到一个数据帧中:

.load('file:///home/path/*/*/*.csv')
Run Code Online (Sandbox Code Playgroud)

如果您想更具体地避免读取某些文件/文件夹,您可以执行以下操作:

.load('file:///home/path/[SF|NYC]weather/data201[4|5]/*.csv')
Run Code Online (Sandbox Code Playgroud)