Spark:仅在路径存在时读取文件

Dar*_*hta 10 scala apache-spark parquet

我正在尝试阅读Sequencescala中Paths中的文件.下面是示例(伪)代码:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)
Run Code Online (Sandbox Code Playgroud)

现在,在上面的序列中,存在一些路径而一些路径不存在.有没有办法在读取parquet文件时忽略丢失的路径(避免org.apache.spark.sql.AnalysisException: Path does not exist)?

我已经尝试了以下它似乎工作,但是,然后,我最终读取相同的路径两次,这是我想避免做的事情:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)
Run Code Online (Sandbox Code Playgroud)

我检查了options方法,DataFrameReader但似乎没有任何类似的选项ignore_if_missing.

此外,这些路径可以是hdfss3(这Seq是作为一个方法参数传递),并一边读书,我不知道一个路径是s3hdfs因此无法使用s3hdfs特定的API,以检查是否存在.

Ass*_*son 12

您可以像在@ Psidom的答案中过滤掉不相关的文件.在spark中,最好的方法是使用内部spark hadoop配置.鉴于spark session变量被称为"spark",你可以这样做:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = {
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)
Run Code Online (Sandbox Code Playgroud)

  • 根据您的系统设置,您可能需要在 get 中指定您的文件系统位置:`FileSystem.get(new URI("s3://bucket"), spark.sparkContext.hadoopConfiguration)`。否则,它可能会创建一个 HDFS 文件系统,并且在检查 S3 文件系统的路径时会失败。 (5认同)

Psi*_*dom 3

过滤第一个怎么样paths

paths.filter(f => new java.io.File(f).exists)
Run Code Online (Sandbox Code Playgroud)

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists)
// res18: List[String] = List(/tmp)
Run Code Online (Sandbox Code Playgroud)

  • 对于 S3,您可能需要检查 [doesObjectExist](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3.html#doesObjectExist-java.lang.String- java.lang.String-),对于 hdfs,您可以看到[这个答案](/sf/ask/2128400991/)。 (2认同)