我的场景
问题:
inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)
filter1 = udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'")
# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()
# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()
Run Code Online (Sandbox Code Playgroud) DataBricks dbutils 库需要在 eclipse 或任何其他 IDE 中使用。诸如 dbutils.secrets.get 之类的方法无法从 Notebook 外部的 SecretUtil API 中使用。在这种情况下我们可以使用com.databricksjar
我正在尝试删除使用 writestream 创建的 Delta Lake 表。我尝试删除表但失败
#table created as
df.writestream().outputmode("append").format("delta").start("/mnt/mytable")
#attempt to drop table
spark.sql("drop table '/mnt/mytable'")
Run Code Online (Sandbox Code Playgroud) 是否可以向 databricks 作业提交/配置 Spark python 脚本 (.py) 文件?
我在 Pycharm IDE 中进行开发,然后将代码推送/提交到我们的 gitlab 存储库。我的要求是当 python 脚本移动到 GitLab 主分支时,我需要在 databricks 集群中创建新作业。
如果可以使用 gitlab.yml 脚本在 python 脚本上创建 databricks 作业,我想得到一些建议?
在databricks Job UI中,我可以看到可以使用的spark jar或笔记本,但想知道我们是否可以提供一个python文件。
谢谢,
尤瓦
我正在尝试将使用 Azure DevOps 的 terraform 部署生成的 databricks 工作区名称作为变量传递到另一个步骤,但不知道该怎么做。
所以我在我的output.tf中定义了输出
output "workspace_name" {
value = azurerm_databricks_workspace.databricks.name
}
Run Code Online (Sandbox Code Playgroud)
我可以看到输出:
通过 stackoverflow,有一个解决方案可以自动将所有输出作为变量:
1-配置输出变量:
2-配置powershell脚本来获取它们并填充它
3-现在怎么办?变量名称是什么?如何在 Databricks Bearer Token 解决方案中使用它?
azure azure-devops azure-pipelines terraform-provider-azure azure-databricks
我正在尝试使用以下代码使用 azure databricks 读取 xml 文件:
df = spark.read.format('com.databricks.spark.xml').options(rowTag='book').load(' /FileStore/tables/sample.xml')
Run Code Online (Sandbox Code Playgroud)
我安装了spark-xml:0.1.1-s_2.10包:
但我收到以下错误:
Py4JJavaError: An error occurred while calling o285.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.xml. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:733)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:276)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.databricks.spark.xml.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$14.apply(DataSource.scala:710)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$14.apply(DataSource.scala:710)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:710)
at …Run Code Online (Sandbox Code Playgroud) 我正在研究 Azure Databrick。我在笔记本上运行 python 脚本并从 SQL 获取数据。我尝试将日期时间列拆分为日期和时间列。这是 python 的语法:
pushdown_query = "(SELECT * FROM STAGE.OutagesAndInterruptions) int_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
df['INTERRUPTION_DATE']=df['INTERRUPTION_TIME'].dt.date
Run Code Online (Sandbox Code Playgroud)
df['INTERRUPTION_TIME'] 看起来像:
+-------------------+
| INTERRUPTION_TIME|
+-------------------+
|1997-05-12 09:57:00|
|1998-03-08 13:00:00|
|1998-02-26 13:00:00|
|1998-02-26 13:00:00|
|1998-03-03 10:04:00|
|1998-05-20 09:27:00|
|1998-11-21 08:51:00|
|1998-11-27 08:44:00|
|1998-10-19 01:19:00|
|1998-10-19 01:44:00|
|2000-03-13 07:00:00|
|2000-03-19 07:30:00|
|2000-08-04 12:55:00|
|2002-09-30 18:11:00|
|2002-09-30 18:11:00|
|2002-05-06 09:22:00|
|2002-01-16 13:15:00|
|2003-01-08 15:46:00|
|2003-02-04 10:25:00|
|2003-02-04 10:25:00|
+-------------------+
Run Code Online (Sandbox Code Playgroud)
当我运行代码时,它抛出一条错误消息:
TypeError: 'DataFrame' object does not support item assignment
---------------------------------------------------------------------------
TypeError …Run Code Online (Sandbox Code Playgroud) 我想从 databricks 中的增量文件中删除数据。我使用这些命令
例如:
PR=spark.read.format('delta').options(header=True).load('/mnt/landing/Base_Tables/EventHistory/')
PR.write.format("delta").mode('overwrite').saveAsTable('PR')
spark.sql('delete from PR where PR_Number=4600')
Run Code Online (Sandbox Code Playgroud)
这是从表中删除数据,但不是从实际的增量文件中删除数据。我想删除文件中的数据而不使用合并操作,因为连接条件不匹配。谁能帮我解决这个问题。
谢谢
我有一个 Dataframe,我想通过 Databricks Notebook 中的 select 语句中的小部件动态传递列名称。我该怎么做?
我正在使用下面的代码
df1 = spark.sql("select * from tableraw")
Run Code Online (Sandbox Code Playgroud)
其中df1有列“tablename”和“layer”
df = df1.select("tablename", "layer")
Run Code Online (Sandbox Code Playgroud)
现在,我们的要求是使用小部件的值来选择这些列,如下所示:
df = df1.select(dbutils.widget.get("tablename"), dbutils.widget.get("datalayer"))
Run Code Online (Sandbox Code Playgroud) 我试图在 Azure 数据块中运行以下查询。
query=s"""WITH pre_file_user AS(
SELECT id,
typeid,
CASE when dttm is null or dttm='' then cast('1900-01-01 00:00:00.000' as timestamp)
else cast(dttm as timestamp)
end as dttm
from dde_pre_file_user_supp
)"""
spark.sql(query)
Run Code Online (Sandbox Code Playgroud)
然后我收到以下错误
ParseException:在输入“with pre_file_users AS”时没有可行的替代方案(\n选择id,\n typepid,以防\n当dttm为空或dttm =''然后强制转换('1900-01-01 00:00:00.000作为时间戳)\n 以 dttm\n 结尾,来自 dde_pre_file_user_supp\n )'
我可以在数据块中使用WITH子句吗还是有其他选择吗?
azure-databricks ×10
pyspark ×5
databricks ×4
apache-spark ×3
azure ×3
sql ×3
delta-lake ×2
azure-devops ×1
gitlab ×1
gitlab-api ×1
python ×1
python-3.x ×1
scala ×1