我在Databricks笔记本中编写了以下代码
name = input("Please enter your name: ")
age = input("How old are you, {0}?".format(name))
print(age)
Run Code Online (Sandbox Code Playgroud)
正如您所猜测的,运行单元格后,系统会要求我“请输入您的姓名:”问题是我不知道在哪里输入。如果这是用 intelliJ IDEA 或 IDLE 编写的,我将获得一个单独的窗口来输入我的名字。但是,使用 Databricks 笔记本,即使我在不同的单元格中输入答案,它似乎也在不断等待输入,请参见图像:
我真的应该知道这个问题的答案
我想使用 IntelliJ IDEA 直接在集群中运行 Spark 进程,因此我遵循下一个文档https://docs.azuredatabricks.net/user-guide/dev-tools/db-connect.html
配置完所有内容后,我运行databricks-connect test但没有获得文档所述的 Scala REPL。
这是我的集群配置
我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。
我想在 where 子句中使用带有两个变量的 WHERE 语句。我对此进行了研究,了解如何在 Databricks 中的 SQL 语句中使用变量以及使用 Python 插入变量,但不起作用。我尝试实施所提供的解决方案,但它不起作用。
a= 17091990
b = 30091990
df = spark.sql(' SELECT * FROM table WHERE date between "a" AND "b" ')
Run Code Online (Sandbox Code Playgroud) 在 SQL(databricks/spark SQL)中进行透视时,有没有办法动态设置 for-in 的“in”部分?
例如,这段代码:
select *
from (select office, country, revenue from sales)
pivot (
sum(revenue)
for country in ('US', 'CA', 'UK')
)
Run Code Online (Sandbox Code Playgroud)
...工作正常,但该country列每个月都会有不同的值,所以我不想每次都查找并重新编写代码。我尝试将country和 select distinct country from sales放在那里,但这些不起作用。有任何想法吗?
在我们的数据管道中,我们从数据源中提取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹中。
然后,我们定期运行 Spark 作业,将这些“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获得最新版本的上游快照。
在此合并逻辑期间:
1)我们将“增量数据”加载为DataFrame df1
2)将当前的“快照表”加载为DataFrame df2
3) 合并 df1 和 df2 去重复 ID 并获取最新版本的行(使用 update_timestamp 列)
此逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,该内存可能非常巨大,具体取决于数据库。
我注意到在 Delta Lake 中,使用以下代码完成类似的操作:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Run Code Online (Sandbox Code Playgroud)
在这里,“updatesDF”可以被认为是来自 CDC 源的“增量数据”。
我的问题:
1)合并/更新插入内部如何工作?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中? …
我正在 Databricks 笔记本中编写 R 代码,该代码在 R 中执行多项操作。清理数据帧后,我想使用“%python”在 python 单元中调用它,因此使用 python 代码继续对数据帧进行操作。
因此,我想在 python 块内将我的 R 数据框转换为 Pandas 数据框。有人知道怎么做这个吗?谢谢!
我目前正在开发一个项目,我们将数据存储在 Azure Datalake 上。Datalake 连接到 Azure Databricks。
该要求要求 Azure Databricks 连接到 C# 应用程序,以便能够运行查询并从 C# 应用程序获取所有结果。我们目前解决该问题的方法是在 Databricks 上创建一个工作区,其中包含许多需要执行的查询。我们创建了一个链接到上述工作区的作业。从 C# 应用程序中,我们调用本文档中列出的许多 API来调用作业实例并等待其执行。但是,我无法从文档中列出的任何 API 中提取结果。
我的问题是,我们采取了正确的方法还是有什么我们没有看到的?如果这是可行的方法,那么您在从 C# 应用程序在 Azure Databricks 上成功运行的作业中提取结果方面有何经验。
我有一个简单的 Spark 作业,将数据流式传输到 Delta 表。该表非常小并且没有分区。
创建了许多小镶木地板文件。
按照文档(https://docs.delta.io/1.0.0/best-practices.html)中的建议,我添加了每天运行一次的压缩作业。
val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)
Run Code Online (Sandbox Code Playgroud)
每次压缩作业运行时,流作业都会出现以下异常:
org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Run Code Online (Sandbox Code Playgroud)
我尝试将以下配置参数添加到流作业中:
spark.databricks.delta.retryWriteConflict.enabled = true # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3 # optionally limit the maximum amout of retries
Run Code Online (Sandbox Code Playgroud)
这没有帮助。
知道如何解决这个问题吗?
databricks ×10
apache-spark ×2
delta-lake ×2
azure ×1
c# ×1
dask ×1
dataframe ×1
pandas ×1
parquet ×1
pivot ×1
python ×1
r ×1
scala ×1
sql ×1