小编Ale*_*Ott的帖子

如何使用 JDBC 驱动程序连接到 Databricks Delta 表

如何使用 JDBC 连接到 Databricks Delta 表?

我尝试过连接 simba 驱动程序,但我在驱动程序类名称和 url 配置方面遇到了困难。

任何解决方案表示赞赏。我无法在此处粘贴代码作为其公司代码。

提前致谢。

java scala azure azure-databricks delta-lake

6
推荐指数
1
解决办法
1万
查看次数

Databricks 增量表与 SQL Server 增量表

Sql Delta 表和 Databricks Delta 表之间有区别吗?看起来对于 SQL,我们在概念上使用这个名称。存储Base表差异的表是Delta。数据块也一样吗?

sql-server databricks delta-lake

6
推荐指数
1
解决办法
2125
查看次数

在大量分区上处理 upsert 不够快

问题

我们在 ADLS Gen2 之上有一个 Delta Lake 设置,其中包含下表:

  • bronze.DeviceData: 按到达日期划分 ( Partition_Date)
  • silver.DeviceData:按事件日期和时间(Partition_DatePartition_Hour)分区

我们从事件中心摄取大量数据(每天超过 6 亿条记录)到bronze.DeviceData(仅追加)。然后我们以流方式处理新文件,并silver.DeviceData使用 delta MERGE 命令将它们更新插入(见下文)。

到达铜牌表的数据可以包含来自任何银牌分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的>90% 的数据都来自分区Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)。因此,为了更新数据,我们有以下两个 spark 作业:

  • “快速”:处理来自上述三个日期分区的数据。延迟在这里很重要,所以我们优先考虑这些数据
  • “慢”:处理其余部分(什么,但是这三个日期的分区)。延迟并不重要,但它应该在“合理”的时间内(我会说不超过一周)

现在我们来解决这个问题:虽然在“慢”工作中数据量少了很多,但它运行数天只是为了处理一天的慢青铜数据,有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,因此这些合并命令效率低下。

而且,随着时间的推移,这个缓慢的工作会变得越来越慢,因为它接触到的银色分区会增长。

问题

  1. 我们的分区方案和快速/慢速 Spark 作业设置通常是解决这个问题的好方法吗?
  2. 可以做些什么来改进这种设置?我们希望降低缓慢作业的成本和延迟,并找到一种方法,使其随着每天到达的数据量以青铜级而不是银级表的大小而增长

附加信息

  • 我们需要 MERGE 命令,因为某些上游服务可以重新处理历史数据,然后也应该更新 Silver 表
  • 银桌的架构:
CREATE TABLE silver.DeviceData (
  DeviceID LONG NOT NULL, -- the …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark databricks delta-lake azure-data-lake-gen2

6
推荐指数
1
解决办法
441
查看次数

如何在 python 中从 mlflow 下载工件

我正在创建一个 mlflow 实验,它将逻辑回归模型以及指标和工件记录在一起。

import mlflow
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_fscore_support

with mlflow.start_run(run_name=run_name, experiment_id=experiment_id):

        logreg = LogisticRegression()
        logreg.fit(x_train, y_train)
        print('training over', flush=True)
        y_pred = logreg.predict(x_test)
        mlflow.sklearn.log_model(logreg, "model")
   
        mlflow.log_metric("f1", precision_recall_fscore_support(y_test, y_pred, average='weighted')[2])
        mlflow.log_artifact(x_train.to_csv('train.csv')
Run Code Online (Sandbox Code Playgroud)

对于某些数据 ( x_train, y_train, x_test, y_test)

有没有办法访问此 run_name 的特定实验 ID 的工件并读取train.csvmodel

python python-3.x mlflow mlops

6
推荐指数
1
解决办法
2万
查看次数

从存储库 Databricks 中的另一个笔记本运行笔记本

我有一个笔记本,其功能位于存储库文件夹中,我试图在另一个笔记本中运行它。

通常我可以这样运行它:%run /Users/name/project/file_name

因此,我将这两个文件(function_notebook、processed_notebook)克隆到 Databricks 中的 Repo 中。

当我尝试复制刚刚克隆的路径时,仅出现此选项:Copy File Path relative to Root

但是,在工作区用户文件夹中,选项是Copy File Path

显然我不太明白相对路径和工作空间路径之间的区别。

如何运行已在存储库中克隆的笔记本?

等级制度:

RepoName(有 2 个文件夹):

  1. 文件夹1Notebook1

  2. 文件夹2Notebook2

Notebook1想要奔跑Notebook2

%run ../Folder2/Notebook2
Run Code Online (Sandbox Code Playgroud)

databricks databricks-repos

6
推荐指数
1
解决办法
8433
查看次数

Koalas/pyspark 找不到数据源:delta

当我尝试在本地使用 koalas.DataFrame.to_delta() 将 Koalas DataFrame 直接写入增量表时,出现以下 Pyspark 异常:
java.lang.ClassNotFoundException: Failed to find data source: delta
编辑:忽略下面,直接调用 Pyspark 也会出现问题。

如果我将 Koalas DataFrame 转换为 Spark DataFrame 然后写入 delta,我似乎没有问题。是否存在 Koalas 不知道但 Pyspark 知道的底层库?看起来很奇怪,因为我认为在幕后使用相同的 Pyspark 模块...我应该注意到 Koalas to_delta() 方法似乎确实在 Databricks 上工作,这表明我的本地设置缺少与 Delta 相关的库。

失败的考拉代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789],
                        'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

编辑:不让考拉 Spark 到 Delta 代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789], …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks delta-lake spark-koalas

6
推荐指数
1
解决办法
1万
查看次数

Trigger.AvailableNow 用于 PySpark (Databricks) 中的 Delta 源流查询

Databricks文档中的所有示例均采用 Scala 语言。无法从 PySpark 找到如何使用此触发器类型。是否有等效的 API 或解决方法?

pyspark databricks spark-structured-streaming delta-lake

6
推荐指数
1
解决办法
1万
查看次数

Databricks DELTA CTAS 与使用 %sql 的 LOCATION

DELTA不具有CREATE TABLE LIKE。它确实有CTAS

我只想复制表的定义LOCATION,但还要指定.

例如,这不起作用:

CREATE TABLE IF NOT EXISTS NEW_CUSTOMER_FEED 
AS SELECT * from NEW_CUSTOMER_FEED WHERE 1 = 0 
LOCATION '/atRest/data'
Run Code Online (Sandbox Code Playgroud)

我缺少什么?

databricks delta-lake databricks-sql

6
推荐指数
1
解决办法
3811
查看次数

如何在不读取内容的情况下获取Delta表的模式?

我有一个包含数百万行和多种类型的列的增量表,包括。嵌套结构。我想在运行时创建增量表的空 DataFrame 克隆- 即相同的架构,没有行。

我可以在不读取表的任何内容的情况下读取架构(以便我可以根据架构创建一个空的 DataFrame)吗?我认为这是可能的,因为存在增量事务日志并且增量需要快速访问表模式本身

我尝试过的:

  • df.schema- 增量表加载后立即访问schema也需要几分钟。
  • limit(0)-limit(0)加载后立即调用仍然需要几分钟。
  • limit(0).cache()-limit有时会在计划中移动,所以我也尝试添加cache“固定其位置”。

还有其他选择吗?仅访问事务日志 JSON 并从最新事务中读取架构是否正确?(鉴于我们

上下文:我想在我们的 CI 中添加一个步骤,在实际使用数据运行之前检查代码和围绕模式的各种假设。

pyspark databricks delta-lake

6
推荐指数
1
解决办法
1万
查看次数

如何将 DLT 目标表定向到 Unity Catalog Metastore

这个问题非常简单。似乎在 DLT 中,您可以定义输出表名称,如下所示:

@dlt.table(name="my_table_name")
def my_pipeline():
  ...
Run Code Online (Sandbox Code Playgroud)

这会写入 hive_metastore 目录,但如何针对不同的目录自定义它?

pyspark databricks delta-live-tables databricks-unity-catalog

6
推荐指数
1
解决办法
2529
查看次数