Ama*_*ddy 4 pyspark azure-databricks
在从数据块中的 pyspark 读取文件之前,我试图检查该文件是否存在以避免异常?我尝试了以下代码片段,但是当文件不存在时出现异常
from pyspark.sql import *
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
try:
df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter",",").options(header='true', inferschema='true').load('/FileStore/tables/HealthCareSample_dumm.csv')
print("File Exists")
except IOError:
print("file not found")`
Run Code Online (Sandbox Code Playgroud)
当我有文件时,它会读取文件并“打印文件存在”,但是当文件不存在时,它会抛出“AnalysisException:'路径不存在:dbfs:/FileStore/tables/HealthCareSample_dumm.csv;'”
ros*_*fun 13
谢谢@Dror 和@Kini。我在集群上运行spark,我必须添加sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),这里s3是集群文件系统的前缀。
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
Run Code Online (Sandbox Code Playgroud)
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
Run Code Online (Sandbox Code Playgroud)
@rosefun 发布的答案对我有用,但我花了很多时间才让它发挥作用。因此,我将详细介绍该解决方案的工作原理以及您应该避免哪些内容。
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
Run Code Online (Sandbox Code Playgroud)
功能相同,并且可以很好地检查您提供的 S3 存储桶路径中是否存在文件。
您必须根据您指定此函数的路径值的方式来更改此函数。
path = f"s3://bucket-name/import/data/"
pathexists = path_exists(path)
Run Code Online (Sandbox Code Playgroud)
如果您定义的路径变量在路径中具有 s3 前缀,那么它就可以工作。
此外,分割字符串的代码部分只提供存储桶名称,如下所示:
path.split("/")[2] will give you `bucket-name`
Run Code Online (Sandbox Code Playgroud)
但如果路径中没有 s3 前缀,那么您将必须通过更改一些代码来使用该函数,如下所示:
def path_exists(path):
# spark is a SparkSession
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
sc._jvm.java.net.URI.create("s3://" + path),
sc._jsc.hadoopConfiguration(),
)
return fs.exists(sc._jvm.org.apache.hadoop.fs.Path("s3://" + path))
Run Code Online (Sandbox Code Playgroud)
很高兴在 StackOverFlow 上见到您。
我第二个dijksterhuis的解决方案,有一个例外 - Analysis Exception是Spark中非常普遍的异常,可能是由于多种原因导致的,而不仅仅是由于丢失文件。
如果你想检查文件是否存在,你需要绕过Spark的FS抽象,直接访问存储系统(无论是s3、posix还是其他)。该解决方案的缺点是缺乏抽象 - 一旦您更改底层文件系统,您也需要更改代码。
| 归档时间: |
|
| 查看次数: |
7581 次 |
| 最近记录: |