标签: databricks

databricks/spark SQL 中的动态枢轴?

在 SQL(databricks/spark SQL)中进行透视时,有没有办法动态设置 for-in 的“in”部分?

例如,这段代码:

select *
from (select office, country, revenue from sales)
pivot (
  sum(revenue)
  for country in ('US', 'CA', 'UK') 
) 
Run Code Online (Sandbox Code Playgroud)

...工作正常,但该country列每个月都会有不同的值,所以我不想每次都查找并重新编写代码。我尝试将countryselect distinct country from sales放在那里,但这些不起作用。有任何想法吗?

sql pivot apache-spark-sql databricks

6
推荐指数
1
解决办法
4029
查看次数

Delta Lake:更新插入内部如何工作?

在我们的数据管道中,我们从数据源中提取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹中。

然后,我们定期运行 Spark 作业,将这些“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获得最新版本的上游快照。

在此合并逻辑期间:

1)我们将“增量数据”加载为DataFrame df1

2)将当前的“快照表”加载为DataFrame df2

3) 合并 df1 和 df2 去重复 ID 并获取最新版本的行(使用 update_timestamp 列)

此逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,该内存可能非常巨大,具体取决于数据库。

我注意到在 Delta Lake 中,使用以下代码完成类似的操作:

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
Run Code Online (Sandbox Code Playgroud)

在这里,“updatesDF”可以被认为是来自 CDC 源的“增量数据”。

我的问题

1)合并/更新插入内部如何工作?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中? …

apache-spark databricks delta-lake

6
推荐指数
1
解决办法
3351
查看次数

Databricks:如何从 R Dataframe 切换到 Pandas Dataframe(同一笔记本中的 R 到 python)

我正在 Databricks 笔记本中编写 R 代码,该代码在 R 中执行多项操作。清理数据帧后,我想使用“%python”在 python 单元中调用它,因此使用 python 代码继续对数据帧进行操作。

因此,我想在 python 块内将我的 R 数据框转换为 Pandas 数据框。有人知道怎么做这个吗?谢谢!

python r dataframe pandas databricks

6
推荐指数
2
解决办法
6308
查看次数

将 C# 应用程序连接到 Azure Databricks

我目前正在开发一个项目,我们将数据存储在 Azure Datalake 上。Datalake 连接到 Azure Databricks。

该要求要求 Azure Databricks 连接到 C# 应用程序,以便能够运行查询并从 C# 应用程序获取所有结果。我们目前解决该问题的方法是在 Databricks 上创建一个工作区,其中包含许多需要执行的查询。我们创建了一个链接到上述工作区的作业。从 C# 应用程序中,我们调用本文档中列出的许多 API来调用作业实例并等待其执行。但是,我无法从文档中列出的任何 API 中提取结果。

我的问题是,我们采取了正确的方法还是有什么我们没有看到的?如果这是可行的方法,那么您在从 C# 应用程序在 Azure Databricks 上成功运行的作业中提取结果方面有何经验。

c# azure azure-data-lake databricks azure-databricks

6
推荐指数
1
解决办法
1万
查看次数

Databricks 文件系统 - %sh ls 与 %fs ls

我有一些文件位于%sh ls,我想将这些文件移动到 databricks 的文件系统中(使它们在 中可见%fs ls)。

%sh ls你们中有人知道和之间有什么区别%fs ls,以及如何在它们之间移动文件吗?

我知道我们可以利用它dbutils.fs.cp来移动已经就位的文件%fs ls

任何帮助或指示表示赞赏。

filesystems databricks

6
推荐指数
1
解决办法
7740
查看次数

如何向已完成的 MLflow 运行添加更多指标?

MLflow 运行完成后,外部脚本可以使用 pythonmlflow客户端和mlflow.get_run(run_id)方法访问其参数和指标,但Run返回的对象get_run似乎是只读的。

具体来说,.log_param .log_metric, 或.log_artifact不能用在 所返回的对象上get_run,从而引发如下错误:

AttributeError: 'Run' object has no attribute 'log_param'
Run Code Online (Sandbox Code Playgroud)

.log_*如果我们尝试在 上运行任何方法mlflow,它会将它们记录到实验中使用自动生成的运行 ID 的新运行中Default

例子:

final_model_mlflow_run = mlflow.get_run(final_model_mlflow_run_id)

with mlflow.ActiveRun(run=final_model_mlflow_run) as myrun:    
    
    # this read operation uses correct run
    run_id = myrun.info.run_id
    print(run_id)
    
    # this write operation writes to a new run 
    # (with auto-generated random run ID) 
    # in the "Default" experiment (with exp. ID of …
Run Code Online (Sandbox Code Playgroud)

python databricks mlflow

6
推荐指数
1
解决办法
3895
查看次数

使用 Spark(Databricks) 的并行 REST API 请求

我想利用 Spark(它在 Databricks 上运行,我正在使用 PySpark)向 REST API 发送并行请求。现在我可能面临两种情况:

  • REST API 1:返回 ~MB 量级的数据
  • REST API 2:返回~KB量级的数据。

关于如何在节点之间分配请求有什么建议吗?

谢谢!

rest apache-spark pyspark databricks azure-databricks

6
推荐指数
1
解决办法
6929
查看次数

使用 python 截断 Databricks 中的增量表

这里给出了 Python 和 SQL 的 Delta 表删除操作,并给出了使用 SQL 的截断操作。但我找不到Python truncate table 的文档。

Databricks 中的增量表如何实现?

python pyspark databricks delta-lake

6
推荐指数
1
解决办法
7810
查看次数

如何在 Databricks 中使用 Selenium 并访问下载的文件并将其移动到安装的存储并保持 Chrome 和 ChromeDriver 版本同步?

我看过几篇关于在 Databricks 中使用 Selenium%sh来安装 Chrome 驱动程序和 Chrome 的帖子。这对我来说效果很好,但是当我需要下载文件时我遇到了很多麻烦。该文件会下载,但我在 databricks 的文件系统中找不到它。即使我在将 Chrome 实例化到 Azure Blob 存储上的挂载文件夹时更改了下载路径,下载后文件也不会放置在那里。还有一个问题是,如何在不手动更改版本号的情况下自动保持 Chrome 浏览器和 ChromeDriver 版本同步。

以下链接显示了有同样问题但没有明确答案的人:

https://forums.databricks.com/questions/19376/if-my-notebook-downloads-a-file-from-a-website-by.html

https://forums.databricks.com/questions/45388/selenium-in-databricks-with-add-experimental-optio.html

当我使用 Selenium Python 进行 Web 自动化时,有没有办法确定文件在 Azure Databricks 中的下载位置?

还有一些人在努力让 Selenium 正常运行: https://forums.databricks.com/questions/14814/selenium-in-databricks.html

不在路径错误中: https://webcache.googleusercontent.com/search? q=cache:NrvVKo4LLdIJ:/sf/ask/4053306071/ -databricks+&cd=5&hl=en&ct=clnk&gl=us

是否有在 Databricks 上使用 Selenium 和管理下载文件的明确指南?如何让 Chrome 浏览器和 ChromeDriver 版本自动保持同步?

python selenium pyspark databricks azure-databricks

6
推荐指数
1
解决办法
1万
查看次数

Spark分区大小大于执行器内存

我有四个问题。假设在 Spark 中我有 3 个工作节点。每个工作节点有 3 个执行器,每个执行器有 3 个核心。每个执行器有 5 GB 内存。(总共 6 个执行器、27 个核心和 15GB 内存)。如果出现以下情况会发生什么:

  • 我有 30 个数据分区。每个分区的大小为 6 GB。最佳情况下,分区数量必须等于核心数量,因为每个核心执行一个分区/任务(每个分区一个任务)。现在在这种情况下,由于分区大小大于可用执行器内存,每个执行器核心将如何处理分区?注意:我没有调用cache()或persist(),只是我在rdd上应用了一些狭窄的转换,例如map()和filter()。

  • Spark 会自动尝试将分区存储在磁盘上吗?(我没有调用cache()或persist(),而只是在调用操作后发生转换)

  • 由于我的分区 (30) 大于可用核心数 (27),因此我的集群最多可以处理 27 个分区,那么剩余 3 个分区会发生什么情况?他们会等待被占用的核心被释放吗?

  • 如果我调用 persist() ,其存储级别设置为 MEMORY_AND_DISK,那么如果分区大小大于内存,它会将数据溢出到磁盘吗?这些数据将存储在哪个磁盘上?工作节点的外部硬盘?

partitioning apache-spark rdd pyspark databricks

6
推荐指数
1
解决办法
4571
查看次数