标签: databricks

如何计算databricks笔记本中命令单元所花费的执行时间

我需要知道执行每个笔记本中的命令单元列表所需的时间。Databricks 显示“命令花费了 x 秒数”来执行。与显示的执行时间类似,我需要捕获执行笔记本中所有命令所需的时间。

在此输入图像描述

在此输入图像描述

scala databricks

6
推荐指数
1
解决办法
6392
查看次数

如何在 Databricks Python Notebook 中运行/执行输入单元

我在Databricks笔记本中编写了以下代码

name = input("Please enter your name: ")
age = input("How old are you, {0}?".format(name))
print(age)
Run Code Online (Sandbox Code Playgroud)

正如您所猜测的,运行单元格后,系统会要求我“请输入您的姓名:”问题是我不知道在哪里输入。如果这是用 intelliJ IDEA 或 IDLE 编写的,我将获得一个单独的窗口来输入我的名字。但是,使用 Databricks 笔记本,即使我在不​​同的单元格中输入答案,它似乎也在不断等待输入,请参见图像:

在哪里输入

我真的应该知道这个问题的答案

databricks azure-databricks

6
推荐指数
2
解决办法
1万
查看次数

为什么配置 Databricks Connect 后“databricks-connect 测试”不起作用?

我想使用 IntelliJ IDEA 直接在集群中运行 Spark 进程,因此我遵循下一个文档https://docs.azuredatabricks.net/user-guide/dev-tools/db-connect.html

配置完所有内容后,我运行databricks-connect test但没有获得文档所述的 Scala REPL。

在此输入图像描述

这是我的集群配置

在此输入图像描述

intellij-idea apache-spark databricks azure-databricks

6
推荐指数
1
解决办法
1万
查看次数

如何在 Databricks 上使用 Dask

我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。

dask databricks dask-distributed azure-databricks

6
推荐指数
3
解决办法
5444
查看次数

如何在databricks中的SQL语句中使用变量?

我想在 where 子句中使用带有两个变量的 WHERE 语句。我对此进行了研究,了解如何在 Databricks 中的 SQL 语句中使用变量以及使用 Python 插入变量,但不起作用。我尝试实施所提供的解决方案,但它不起作用。

a= 17091990
b = 30091990

df = spark.sql(' SELECT * FROM table WHERE date between "a" AND "b" ')
Run Code Online (Sandbox Code Playgroud)

databricks

6
推荐指数
1
解决办法
1万
查看次数

databricks/spark SQL 中的动态枢轴?

在 SQL(databricks/spark SQL)中进行透视时,有没有办法动态设置 for-in 的“in”部分?

例如,这段代码:

select *
from (select office, country, revenue from sales)
pivot (
  sum(revenue)
  for country in ('US', 'CA', 'UK') 
) 
Run Code Online (Sandbox Code Playgroud)

...工作正常,但该country列每个月都会有不同的值,所以我不想每次都查找并重新编写代码。我尝试将countryselect distinct country from sales放在那里,但这些不起作用。有任何想法吗?

sql pivot apache-spark-sql databricks

6
推荐指数
1
解决办法
4029
查看次数

Delta Lake:更新插入内部如何工作?

在我们的数据管道中,我们从数据源中提取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹中。

然后,我们定期运行 Spark 作业,将这些“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获得最新版本的上游快照。

在此合并逻辑期间:

1)我们将“增量数据”加载为DataFrame df1

2)将当前的“快照表”加载为DataFrame df2

3) 合并 df1 和 df2 去重复 ID 并获取最新版本的行(使用 update_timestamp 列)

此逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,该内存可能非常巨大,具体取决于数据库。

我注意到在 Delta Lake 中,使用以下代码完成类似的操作:

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
Run Code Online (Sandbox Code Playgroud)

在这里,“updatesDF”可以被认为是来自 CDC 源的“增量数据”。

我的问题

1)合并/更新插入内部如何工作?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中? …

apache-spark databricks delta-lake

6
推荐指数
1
解决办法
3351
查看次数

Databricks:如何从 R Dataframe 切换到 Pandas Dataframe(同一笔记本中的 R 到 python)

我正在 Databricks 笔记本中编写 R 代码,该代码在 R 中执行多项操作。清理数据帧后,我想使用“%python”在 python 单元中调用它,因此使用 python 代码继续对数据帧进行操作。

因此,我想在 python 块内将我的 R 数据框转换为 Pandas 数据框。有人知道怎么做这个吗?谢谢!

python r dataframe pandas databricks

6
推荐指数
2
解决办法
6308
查看次数

将 C# 应用程序连接到 Azure Databricks

我目前正在开发一个项目,我们将数据存储在 Azure Datalake 上。Datalake 连接到 Azure Databricks。

该要求要求 Azure Databricks 连接到 C# 应用程序,以便能够运行查询并从 C# 应用程序获取所有结果。我们目前解决该问题的方法是在 Databricks 上创建一个工作区,其中包含许多需要执行的查询。我们创建了一个链接到上述工作区的作业。从 C# 应用程序中,我们调用本文档中列出的许多 API来调用作业实例并等待其执行。但是,我无法从文档中列出的任何 API 中提取结果。

我的问题是,我们采取了正确的方法还是有什么我们没有看到的?如果这是可行的方法,那么您在从 C# 应用程序在 Azure Databricks 上成功运行的作业中提取结果方面有何经验。

c# azure azure-data-lake databricks azure-databricks

6
推荐指数
1
解决办法
1万
查看次数

异常:org.apache.spark.sql.delta.ConcurrentAppendException:文件通过并发更新添加到表的根目录

我有一个简单的 Spark 作业,将数据流式传输到 Delta 表。该表非常小并且没有分区。

创建了许多小镶木地板文件。

按照文档(https://docs.delta.io/1.0.0/best-practices.html)中的建议,我添加了每天运行一次的压缩作业。

    val path = "..."
    val numFiles = 16
    
    spark.read
     .format("delta")
     .load(path)
     .repartition(numFiles)
     .write
     .option("dataChange", "false")
     .format("delta")
     .mode("overwrite")
     .save(path)
Run Code Online (Sandbox Code Playgroud)

每次压缩作业运行时,流作业都会出现以下异常:

org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Run Code Online (Sandbox Code Playgroud)

我尝试将以下配置参数添加到流作业中:

spark.databricks.delta.retryWriteConflict.enabled = true  # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3  # optionally limit the maximum amout of retries
Run Code Online (Sandbox Code Playgroud)

这没有帮助。

知道如何解决这个问题吗?

parquet spark-streaming databricks delta-lake

6
推荐指数
1
解决办法
1万
查看次数