尝试访问 Azure Databricks 中的 Azure DBFS 文件系统时出现装载错误

Dat*_*ice 9 python azure databricks azure-databricks

我能够建立与 Databricks FileStore 的连接DBFS并访问文件存储。

使用 Pyspark 读取、写入和转换数据是可能的,但是当我尝试使用本地 Python API(例如pathlibOS模块)时,我无法通过 DBFS 文件系统的第一级

我可以使用一个神奇的命令:

%fs ls dbfs:\mnt\my_fs\...哪个工作完美并列出所有子目录?

但如果我这样做,它会作为返回值os.listdir('\dbfs\mnt\my_fs\')返回['mount.err']

我已经在新集群上进行了测试,结果是相同的

我在 Databricks Runtine 版本 6.1 和 Apache Spark 2.4.4 上使用 Python

有谁能提供建议吗?

编辑 :

连接脚本:

我使用 Databricks CLI 库来存储根据 databricks 文档格式化的凭据:

 def initialise_connection(secrets_func):
  configs = secrets_func()
  # Check if the mount exists
  bMountExists = False
  for item in dbutils.fs.ls("/mnt/"):
      if str(item.name) == r"WFM/":
          bMountExists = True
      # drop if exists to refresh credentials
      if bMountExists:
        dbutils.fs.unmount("/mnt/WFM")
        bMountExists = False

      # Mount a drive
      if not (bMountExists):
          dbutils.fs.mount(
              source="adl://test.azuredatalakestore.net/WFM",
              mount_point="/mnt/WFM",
              extra_configs=configs
          )
          print("Drive mounted")
      else:
          print("Drive already mounted")
Run Code Online (Sandbox Code Playgroud)

dan*_*alk 5

当同一个容器安装到工作区中的两个不同路径时,我们遇到了此问题。全部卸载并重新安装解决了我们的问题。我们使用的是 Databricks 版本 6.2(Spark 2.4.4、Scala 2.11)。我们的 blob 存储容器配置:

  • 性能/访问层:标准/热门
  • 复制:读取访问异地冗余存储 (RA-GRS)
  • 账户类型:StorageV2(通用v2)

要运行以卸载所有安装的笔记本脚本/mnt

# Iterate through all mounts and unmount 
print('Unmounting all mounts beginning with /mnt/')
dbutils.fs.mounts()
for mount in dbutils.fs.mounts():
  if mount.mountPoint.startswith('/mnt/'):
    dbutils.fs.unmount(mount.mountPoint)

# Re-list all mount points
print('Re-listing all mounts')
dbutils.fs.mounts()
Run Code Online (Sandbox Code Playgroud)

在自动化作业集群上测试的最少作业

假设您有一个单独的过程来创建安装座。创建作业定义 ( job.json) 以在自动化集群上运行 Python 脚本:

{
  "name": "Minimal Job",
  "new_cluster": {
    "spark_version": "6.2.x-scala2.11",
    "spark_conf": {},
    "node_type_id": "Standard_F8s",
    "driver_node_type_id": "Standard_F8s",
    "num_workers": 2,
    "enable_elastic_disk": true,
    "spark_env_vars": {
      "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
    }
  },
  "timeout_seconds": 14400,
  "max_retries": 0,
  "spark_python_task": {
    "python_file": "dbfs:/minimal/job.py"
  }
}
Run Code Online (Sandbox Code Playgroud)

job.py用于打印坐骑的Python 文件 ( ):

{
  "name": "Minimal Job",
  "new_cluster": {
    "spark_version": "6.2.x-scala2.11",
    "spark_conf": {},
    "node_type_id": "Standard_F8s",
    "driver_node_type_id": "Standard_F8s",
    "num_workers": 2,
    "enable_elastic_disk": true,
    "spark_env_vars": {
      "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
    }
  },
  "timeout_seconds": 14400,
  "max_retries": 0,
  "spark_python_task": {
    "python_file": "dbfs:/minimal/job.py"
  }
}
Run Code Online (Sandbox Code Playgroud)

运行 databricks CLI 命令来运行作业。查看 Spark Driver 日志的输出,确认mount.err不存在。

import os

path_mounts = '/dbfs/mnt/'
print(f"Listing contents of {path_mounts}:")
print(os.listdir(path_mounts))

path_mount = path_mounts + 'YOURCONTAINERNAME'
print(f"Listing contents of {path_mount }:")
print(os.listdir(path_mount))
Run Code Online (Sandbox Code Playgroud)