如何使用 Spark Session 列出 S3 存储桶中的文件?

cod*_*ark 8 amazon-s3 apache-spark apache-spark-sql

是否可以使用 SparkSession 对象列出给定 S3 路径(例如:s3://my-bucket/my-folder/*.extension)中的所有文件?

Mic*_*tor 14

您可以使用 Hadoop API 访问 S3 上的文件(Spark 也使用它):

import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val path = "s3://somebucket/somefolder"
val fileSystem = FileSystem.get(URI.create(path), new Configuration())
val it = fileSystem.listFiles(new Path(path), true)
while (it.hasNext()) {
  ...
}
Run Code Online (Sandbox Code Playgroud)


Lou*_*ell 12

方法一

对于 pyspark 用户,我翻译了 Michael Spector 的答案(我将留给您来决定使用这是否是一个好主意):

sc = spark.sparkContext
myPath = f's3://my-bucket/my-prefix/'
javaPath = sc._jvm.java.net.URI.create(myPath)
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem.get(javaPath, sc._jvm.org.apache.hadoop.conf.Configuration())
iterator = hadoopFileSystem.listFiles(hadoopPath, True)

s3_keys = []
while iterator.hasNext():
    s3_keys.append(iterator.next().getPath().toUri().getRawPath())    
Run Code Online (Sandbox Code Playgroud)

s3_keys现在保存在以下位置找到的所有文件密钥my-bucket/my-prefix

方法 2 这是我发现的另一种选择(向@forgetso致敬):

myPath = 's3://my-bucket/my-prefix/*'
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(sc._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
  status.getPath().toUri().getRawPath()
  # Alternatively, you can get file names only with:
  # status.getPath().getName()
Run Code Online (Sandbox Code Playgroud)

方法 3(不完整!)

上述两种方法不使用应用于分布式读取的 Spark 并行机制。不过,这种逻辑看起来很私密。看parallelListLeafFiles 这里。我还没有找到一种方法来强制 pyspark 在 s3 上进行分发ls而不读取文件内容。我尝试使用 py4j 实例化 an InMemoryFileIndex,但无法获得正确的咒语。如果有人想从这里获取的话,这是我到目前为止所拥有的:

myPath = f's3://my-bucket/my-path/'
paths = sc._gateway.new_array(sc._jvm.org.apache.hadoop.fs.Path, 1)
paths[0] = sc._jvm.org.apache.hadoop.fs.Path(myPath)

emptyHashMap = sc._jvm.java.util.HashMap()
emptyScalaMap = sc._jvm.scala.collection.JavaConversions.mapAsScalaMap(emptyMap)

# Py4J is not happy with this:
sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
    spark._jsparkSession, 
    paths, 
    emptyScalaMap, 
    sc._jvm.scala.Option.empty() # Optional None
)
Run Code Online (Sandbox Code Playgroud)