我尝试在 Databricks 中分析 500Mb 的数据集。这些数据存储在 Excel 文件中。我做的第一件事是从 Maven 安装 Spark Excel 包com.crealytics.spark.excel(最新版本 - 0.11.1)。
这些是集群的参数:
然后我在 Scala 笔记本中执行了以下代码:
val df_spc = spark.read
.format("com.crealytics.spark.excel")
.option("useHeader", "true")
.load("dbfs:/FileStore/tables/test.xlsx")
Run Code Online (Sandbox Code Playgroud)
但我收到了有关 Java 堆大小的错误,然后收到另一个错误“java.io.IOException:超出了 GC 开销限制”。然后我再次执行这段代码,运行 5 分钟后又出现错误:
火花驱动器意外停止并正在重新启动。您的笔记本将自动重新连接。
我不明白为什么会这样。事实上,对于分布式计算来说,数据集相当小,集群大小应该足以处理这些数据。我应该检查什么来解决它?
我能够建立与 Databricks FileStore 的连接DBFS并访问文件存储。
使用 Pyspark 读取、写入和转换数据是可能的,但是当我尝试使用本地 Python API(例如pathlib或OS模块)时,我无法通过 DBFS 文件系统的第一级
我可以使用一个神奇的命令:
%fs ls dbfs:\mnt\my_fs\...哪个工作完美并列出所有子目录?
但如果我这样做,它会作为返回值os.listdir('\dbfs\mnt\my_fs\')返回['mount.err']
我已经在新集群上进行了测试,结果是相同的
我在 Databricks Runtine 版本 6.1 和 Apache Spark 2.4.4 上使用 Python
有谁能提供建议吗?
连接脚本:
我使用 Databricks CLI 库来存储根据 databricks 文档格式化的凭据:
def initialise_connection(secrets_func):
configs = secrets_func()
# Check if the mount exists
bMountExists = False
for item in dbutils.fs.ls("/mnt/"):
if str(item.name) == r"WFM/":
bMountExists = True
# drop if exists to refresh credentials …Run Code Online (Sandbox Code Playgroud) 我有许多带有小部件的笔记本,当前“小部件更改时”的默认设置是“运行访问的命令”。有没有办法将其全局设置为“不执行任何操作”。
我可以在单个笔记本上执行此操作,但如果我关闭笔记本然后重新打开,它会将自身重置为“运行访问的命令”
我试过这个%fs ls dbfs:/mnt,但我想知道这会给我所有的挂载点吗?
我第一次尝试使用 Databricks CLI。每当我尝试使用 cli 进行某些操作时,它都会向我显示消息:“错误:b'Bad Request'”
这对于我能够进行身份验证的任何基于 cli 的命令都是相同的
(尝试使用错误的令牌并收到身份验证错误)
请找到调试跟踪如下:
命令:databricks fs ls --debug
result:
HTTP debugging enabled
send: b'GET /api/2.0/dbfs/list?path=dbfs%3A%2F HTTP/1.1\r\nHost: xxx-xxxxxxxxxxxxxxxx.xx.azuredatabricks.net\r\nuser-agent: databricks-cli-0.14.0-fs-ls-a2695361-282f-11eb-83c1-34f39aa616fb\r\nAccept-Encoding: gzip, deflate\r\nAccept: */*\r\nConnection: keep-alive\r\nAuthorization: Bearer \x16\r\nContent-Type: text/json\r\n\r\n'
reply: 'HTTP/1.1 400 Bad Request\r\n'
header: content-length: 11
header: content-type: text/plain<br>
header: date: Mon, 16 Nov 2020 17:17:42 GMT
header: server: databricks
header: connection: close
Error: b'Bad Request'
Run Code Online (Sandbox Code Playgroud) 我正在尝试从 Azure Databricks 工作区读取 Azure SQL 实例上的数据,避免使用用户名/密码个人凭据进行自动、定期数据获取和分析。我认为使用托管标识可以完成这项工作,但它看起来不如 Azure Functions 或 Web 服务顺利。Databricks 支持此功能吗?
我需要 Databricks 实例中不存在的环境变量,例如 IDENTITY_ENDPOINT 和 IDENTITY_HEADER,遵循文档https://learn.microsoft.com/en-us/azure/app-service/overview-management-identity
任何见解将不胜感激!
azure azure-sql-database azure-managed-identity azure-databricks
我有一个表,其主键为多个列,因此我需要对多个列执行合并逻辑
DeltaTable.forPath(spark, "path")
.as("data")
.merge(
finalDf1.as("updates"),
"data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()
Run Code Online (Sandbox Code Playgroud)
当我检查数据计数时,它没有按预期更新。
有人可以帮我解决这个问题吗?
我正在尝试获取 python 笔记本中的工作区名称。我们有什么办法可以做到这一点吗?
例如:我的工作区名称是databricks-test.
我想在 python 笔记本的变量中捕获它
是否可以在 Azure 中检查 Databricks Runtime 的版本?
我收到这个错误
Can't pickle <class 'google.protobuf.pyext._message.CMessage'>: it's not found as google.protobuf.pyext._message.CMessage
当我尝试在 PySpark 中创建 UDF 时。显然,它使用 CloudPickle 来序列化命令,但是,我知道 protobuf 消息包含C++实现,这意味着它不能被腌制。
我试图找到一种方法来覆盖CloudPickleSerializer,但是,我找不到方法。
这是我的示例代码:
from MyProject.Proto import MyProtoMessage
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F
def proto_deserialize(body):
msg = MyProtoMessage()
msg.ParseFromString(body)
return MessageToJson(msg)
from_proto = F.udf(lambda s: proto_deserialize(s))
base.withColumn("content", from_proto(F.col("body")))
Run Code Online (Sandbox Code Playgroud)
提前致谢。
azure-databricks ×10
databricks ×6
azure ×3
python ×2
scala ×2
apache-spark ×1
delta-lake ×1
excel ×1
pyspark ×1
version ×1