Dat*_*ice 9 python azure databricks azure-databricks
我能够建立与 Databricks FileStore 的连接DBFS
并访问文件存储。
使用 Pyspark 读取、写入和转换数据是可能的,但是当我尝试使用本地 Python API(例如pathlib
或OS
模块)时,我无法通过 DBFS 文件系统的第一级
我可以使用一个神奇的命令:
%fs ls dbfs:\mnt\my_fs\...
哪个工作完美并列出所有子目录?
但如果我这样做,它会作为返回值os.listdir('\dbfs\mnt\my_fs\')
返回['mount.err']
我已经在新集群上进行了测试,结果是相同的
我在 Databricks Runtine 版本 6.1 和 Apache Spark 2.4.4 上使用 Python
有谁能提供建议吗?
连接脚本:
我使用 Databricks CLI 库来存储根据 databricks 文档格式化的凭据:
def initialise_connection(secrets_func):
configs = secrets_func()
# Check if the mount exists
bMountExists = False
for item in dbutils.fs.ls("/mnt/"):
if str(item.name) == r"WFM/":
bMountExists = True
# drop if exists to refresh credentials
if bMountExists:
dbutils.fs.unmount("/mnt/WFM")
bMountExists = False
# Mount a drive
if not (bMountExists):
dbutils.fs.mount(
source="adl://test.azuredatalakestore.net/WFM",
mount_point="/mnt/WFM",
extra_configs=configs
)
print("Drive mounted")
else:
print("Drive already mounted")
Run Code Online (Sandbox Code Playgroud)
当同一个容器安装到工作区中的两个不同路径时,我们遇到了此问题。全部卸载并重新安装解决了我们的问题。我们使用的是 Databricks 版本 6.2(Spark 2.4.4、Scala 2.11)。我们的 blob 存储容器配置:
要运行以卸载所有安装的笔记本脚本/mnt
:
# Iterate through all mounts and unmount
print('Unmounting all mounts beginning with /mnt/')
dbutils.fs.mounts()
for mount in dbutils.fs.mounts():
if mount.mountPoint.startswith('/mnt/'):
dbutils.fs.unmount(mount.mountPoint)
# Re-list all mount points
print('Re-listing all mounts')
dbutils.fs.mounts()
Run Code Online (Sandbox Code Playgroud)
假设您有一个单独的过程来创建安装座。创建作业定义 ( job.json
) 以在自动化集群上运行 Python 脚本:
{
"name": "Minimal Job",
"new_cluster": {
"spark_version": "6.2.x-scala2.11",
"spark_conf": {},
"node_type_id": "Standard_F8s",
"driver_node_type_id": "Standard_F8s",
"num_workers": 2,
"enable_elastic_disk": true,
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
}
},
"timeout_seconds": 14400,
"max_retries": 0,
"spark_python_task": {
"python_file": "dbfs:/minimal/job.py"
}
}
Run Code Online (Sandbox Code Playgroud)
job.py
用于打印坐骑的Python 文件 ( ):
{
"name": "Minimal Job",
"new_cluster": {
"spark_version": "6.2.x-scala2.11",
"spark_conf": {},
"node_type_id": "Standard_F8s",
"driver_node_type_id": "Standard_F8s",
"num_workers": 2,
"enable_elastic_disk": true,
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
}
},
"timeout_seconds": 14400,
"max_retries": 0,
"spark_python_task": {
"python_file": "dbfs:/minimal/job.py"
}
}
Run Code Online (Sandbox Code Playgroud)
运行 databricks CLI 命令来运行作业。查看 Spark Driver 日志的输出,确认mount.err
不存在。
import os
path_mounts = '/dbfs/mnt/'
print(f"Listing contents of {path_mounts}:")
print(os.listdir(path_mounts))
path_mount = path_mounts + 'YOURCONTAINERNAME'
print(f"Listing contents of {path_mount }:")
print(os.listdir(path_mount))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
15965 次 |
最近记录: |