如何使用pyspark检查文件/文件夹是否存在而不会出现异常

Ama*_*ddy 4 pyspark azure-databricks

在从数据块中的 pyspark 读取文件之前,我试图检查该文件是否存在以避免异常?我尝试了以下代码片段,但是当文件不存在时出现异常

from pyspark.sql import *
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
try:
    df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter",",").options(header='true', inferschema='true').load('/FileStore/tables/HealthCareSample_dumm.csv')
    print("File Exists")
except IOError:
    print("file not found")`
Run Code Online (Sandbox Code Playgroud)

当我有文件时,它会读取文件并“打印文件存在”,但是当文件不存在时,它会抛出“AnalysisException:'路径不存在:dbfs:/FileStore/tables/HealthCareSample_dumm.csv;'”

ros*_*fun 13

谢谢@Dror 和@Kini。我在集群上运行spark,我必须添加sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),这里s3是集群文件系统的前缀。

  def path_exists(path):
    # spark is a SparkSession
    sc = spark.sparkContext
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
        sc._jsc.hadoopConfiguration(),
    )
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
Run Code Online (Sandbox Code Playgroud)


Pra*_*ini 5

fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
Run Code Online (Sandbox Code Playgroud)

  • 对于 S3 文件系统,这种方法会失败。[这是 S3 的解决方案](https://gist.github.com/drorata/73c15740f83d0cb0b187d00a57ca74a1#check-file-existence-on-s3-using-pyspark)。 (2认同)

Nik*_*iya 5

@rosefun 发布的答案对我有用,但我花了很多时间才让它发挥作用。因此,我将详细介绍该解决方案的工作原理以及您应该避免哪些内容。

def path_exists(path):
    # spark is a SparkSession
    sc = spark.sparkContext
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
        sc._jsc.hadoopConfiguration(),
    )
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
Run Code Online (Sandbox Code Playgroud)

功能相同,并且可以很好地检查您提供的 S3 存储桶路径中是否存在文件。

您必须根据您指定此函数的路径值的方式来更改此函数。

path = f"s3://bucket-name/import/data/"
pathexists = path_exists(path)
Run Code Online (Sandbox Code Playgroud)

如果您定义的路径变量在路径中具有 s3 前缀,那么它就可以工作。

此外,分割字符串的代码部分只提供存储桶名称,如下所示:

path.split("/")[2] will give you `bucket-name`
Run Code Online (Sandbox Code Playgroud)

但如果路径中没有 s3 前缀,那么您将必须通过更改一些代码来使用该函数,如下所示:

def path_exists(path):
   # spark is a SparkSession
   sc = spark.sparkContext
   fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path),
        sc._jsc.hadoopConfiguration(),
   )
   return fs.exists(sc._jvm.org.apache.hadoop.fs.Path("s3://" + path))
Run Code Online (Sandbox Code Playgroud)


Eli*_*lul 1

很高兴在 StackOverFlow 上见到您。

我第二个dijksterhuis的解决方案,有一个例外 - Analysis Exception是Spark中非常普遍的异常,可能是由于多种原因导致的,而不仅仅是由于丢失文件。

如果你想检查文件是否存在,你需要绕过Spark的FS抽象,直接访问存储系统(无论是s3、posix还是其他)。该解决方案的缺点是缺乏抽象 - 一旦您更改底层文件系统,您也需要更改代码。