标签: azure-databricks

结构化流写入多个流

我的场景

  1. 从流中获取数据并调用返回 json 字符串的 UDF。JSON 字符串中的属性之一是 UniqueId,UDF 生成为 guid.newGuid() (C#)。
    1. UDF 的 DataFrame 输出基于某些 fiter 写入多个流/接收器。

问题:

  1. 每个接收器都会获取由 UDF 生成的 UniqueId 的新值。我如何为所有接收器维护相同的 UniqueId。
  2. 如果每个接收器获得不同的 UniqueId 值,这是否意味着每个接收器都会多次调用我的 UDF?
  3. 如果 UDF 被调用两次,有什么选项可以让它调用一次,然后将相同的数据写入不同的接收器
inData = spark.readstream().format("eventhub")

udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)

filter1 =  udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'") 

# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()

# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming azure-databricks

4
推荐指数
1
解决办法
5663
查看次数

如何在笔记本之外使用DataBricks dbutils jar?

DataBricks dbutils 库需要在 eclipse 或任何其他 IDE 中使用。诸如 dbutils.secrets.get 之类的方法无法从 Notebook 外部的 SecretUtil API 中使用。在这种情况下我们可以使用com.databricksjar

apache-spark databricks azure-databricks

4
推荐指数
1
解决办法
4311
查看次数

如何删除非托管 Delta Lake 表

我正在尝试删除使用 writestream 创建的 Delta Lake 表。我尝试删除表但失败

#table created as
df.writestream().outputmode("append").format("delta").start("/mnt/mytable")

#attempt to drop table
spark.sql("drop table '/mnt/mytable'")
Run Code Online (Sandbox Code Playgroud)

databricks azure-databricks delta-lake

4
推荐指数
1
解决办法
6339
查看次数

将 Python 脚本提交到 Databricks JOB

是否可以向 databricks 作业提交/配置 Spark python 脚本 (.py) 文件?

我在 Pycharm IDE 中进行开发,然后将代码推送/提交到我们的 gitlab 存储库。我的要求是当 python 脚本移动到 GitLab 主分支时,我需要在 databricks 集群中创建新作业。

如果可以使用 gitlab.yml 脚本在 python 脚本上创建 databricks 作业,我想得到一些建议?

在databricks Job UI中,我可以看到可以使用的spark jar或笔记本,但想知道我们是否可以提供一个python文件。

谢谢,

尤瓦

gitlab pyspark gitlab-api databricks azure-databricks

4
推荐指数
1
解决办法
7036
查看次数

如何使用 terraform 输出作为 Azure DevOps 管道中的变量

我正在尝试将使用 Azure DevOps 的 terraform 部署生成的 databricks 工作区名称作为变量传递到另一个步骤,但不知道该怎么做。

所以我在我的output.tf中定义了输出

output "workspace_name" {
  value = azurerm_databricks_workspace.databricks.name
}
Run Code Online (Sandbox Code Playgroud)

我可以看到输出:

在此输入图像描述

通过 stackoverflow,有一个解决方案可以自动将所有输出作为变量:

1-配置输出变量:

在此输入图像描述

2-配置powershell脚本来获取它们并填充它

电源外壳

3-现在怎么办?变量名称是什么?如何在 Databricks Bearer Token 解决方案中使用它?

azure azure-devops azure-pipelines terraform-provider-azure azure-databricks

4
推荐指数
1
解决办法
5139
查看次数

天蓝色数据砖:java.lang.ClassNotFoundException:无法找到数据源:com.databricks.spark.xml

我正在尝试使用以下代码使用 azure databricks 读取 xml 文件:

df = spark.read.format('com.databricks.spark.xml').options(rowTag='book').load(' /FileStore/tables/sample.xml')
Run Code Online (Sandbox Code Playgroud)

我安装了spark-xml:0.1.1-s_2.10包:

在此输入图像描述

但我收到以下错误:

Py4JJavaError: An error occurred while calling o285.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.xml. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:733)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:276)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.databricks.spark.xml.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$14.apply(DataSource.scala:710)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$14.apply(DataSource.scala:710)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:710)
    at …
Run Code Online (Sandbox Code Playgroud)

azure pyspark azure-databricks

4
推荐指数
1
解决办法
1万
查看次数

Python Azure Databrick:“DataFrame”对象不支持项目分配

我正在研究 Azure Databrick。我在笔记本上运行 python 脚本并从 SQL 获取数据。我尝试将日期时间列拆分为日期和时间列。这是 python 的语法:

    pushdown_query = "(SELECT * FROM STAGE.OutagesAndInterruptions) int_alias"
    df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

    df['INTERRUPTION_DATE']=df['INTERRUPTION_TIME'].dt.date
Run Code Online (Sandbox Code Playgroud)

df['INTERRUPTION_TIME'] 看起来像:

+-------------------+
|  INTERRUPTION_TIME|
+-------------------+
|1997-05-12 09:57:00|
|1998-03-08 13:00:00|
|1998-02-26 13:00:00|
|1998-02-26 13:00:00|
|1998-03-03 10:04:00|
|1998-05-20 09:27:00|
|1998-11-21 08:51:00|
|1998-11-27 08:44:00|
|1998-10-19 01:19:00|
|1998-10-19 01:44:00|
|2000-03-13 07:00:00|
|2000-03-19 07:30:00|
|2000-08-04 12:55:00|
|2002-09-30 18:11:00|
|2002-09-30 18:11:00|
|2002-05-06 09:22:00|
|2002-01-16 13:15:00|
|2003-01-08 15:46:00|
|2003-02-04 10:25:00|
|2003-02-04 10:25:00|
+-------------------+
Run Code Online (Sandbox Code Playgroud)

当我运行代码时,它抛出一条错误消息:

TypeError: 'DataFrame' object does not support item assignment
---------------------------------------------------------------------------
TypeError …
Run Code Online (Sandbox Code Playgroud)

python azure python-3.x azure-databricks

4
推荐指数
1
解决办法
2万
查看次数

如何从databricks中的增量文件中删除数据?

我想从 databricks 中的增量文件中删除数据。我使用这些命令
例如:

PR=spark.read.format('delta').options(header=True).load('/mnt/landing/Base_Tables/EventHistory/')
PR.write.format("delta").mode('overwrite').saveAsTable('PR')
spark.sql('delete from PR where PR_Number=4600')
Run Code Online (Sandbox Code Playgroud)

这是从表中删除数据,但不是从实际的增量文件中删除数据。我想删除文件中的数据而不使用合并操作,因为连接条件不匹配。谁能帮我解决这个问题。

谢谢

sql apache-spark-sql pyspark azure-databricks delta-lake

4
推荐指数
1
解决办法
2万
查看次数

如何使用小部件在 Dataframe select 语句中传递动态列名称

我有一个 Dataframe,我想通过 Databricks Notebook 中的 select 语句中的小部件动态传递列名称。我该怎么做?

我正在使用下面的代码

df1 = spark.sql("select * from tableraw")
Run Code Online (Sandbox Code Playgroud)

其中df1有列“tablename”和“layer”

df = df1.select("tablename", "layer")
Run Code Online (Sandbox Code Playgroud)

现在,我们的要求是使用小部件的值来选择这些列,如下所示:

df = df1.select(dbutils.widget.get("tablename"), dbutils.widget.get("datalayer"))
Run Code Online (Sandbox Code Playgroud)

sql scala pyspark databricks azure-databricks

4
推荐指数
1
解决办法
2万
查看次数

ParseExpection:输入时没有可行的替代方案

我试图在 Azure 数据块中运行以下查询。

query=s"""WITH pre_file_user AS(
            SELECT id,
            typeid,
          CASE when dttm is null or dttm='' then cast('1900-01-01 00:00:00.000' as timestamp)
          else cast(dttm as timestamp)
          end as dttm
          from dde_pre_file_user_supp
)"""

spark.sql(query)
Run Code Online (Sandbox Code Playgroud)

然后我收到以下错误

ParseException:在输入“with pre_file_users AS”时没有可行的替代方案(\n选择id,\n typepid,以防\n当dttm为空或dttm =''然后强制转换('1900-01-01 00:00:00.000作为时间戳)\n 以 dttm\n 结尾,来自 dde_pre_file_user_supp\n )'

我可以在数据块中使用WITH子句吗还是有其他选择吗?

sql apache-spark apache-spark-sql pyspark azure-databricks

4
推荐指数
1
解决办法
5万
查看次数