标签: azure-databricks

仅当文件不存在时,才使用 dbtuils 复制 Databricks 文件

我正在使用以下 databricks utilites ( dbutils) 命令将文件从一个位置复制到另一个位置,如下所示:

dbutils.fs.cp('adl://dblake.azuredatalakestore.net/jfolder2/thisfile.csv','adl://cadblake.azuredatalakestore.net/landing/')
Run Code Online (Sandbox Code Playgroud)

但是,我希望仅当不thisfile.csv存在具有相同名称“ ”的此类文件时才复制该文件。

有人可以让我知道这是否可能吗?

如果没有,还有其他解决方法吗?

databricks azure-databricks

3
推荐指数
1
解决办法
5117
查看次数

如何重命名 Azure 数据湖上保存的文件

我尝试使用数据块中的 scala 合并 Datalake 中的两个文件,并使用以下代码将其保存回 Datalake:

val df =sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("adl://xxxxxxxx/Test/CSV") 
df.coalesce(1).write.
              format("com.databricks.spark.csv").
              mode("overwrite").
              option("header", "true").
save("adl://xxxxxxxx/Test/CSV/final_data.csv")
Run Code Online (Sandbox Code Playgroud)

但是,文件 Final_data.csv 保存为目录,而不是包含多个文件的文件,并且实际的 .csv 文件保存为“part-00000-tid-dddddddddd-xxxxxxxxxx.csv”。

如何重命名该文件以便将其移动到另一个目录?

scala azure-data-lake azure-databricks

3
推荐指数
1
解决办法
2万
查看次数

如何使用ARM模板创建Databricks集群

我想使用 ARM 模板创建 Databricks 集群。支持吗?如果支持,请提供一些参考。以下是使用ARM模板创建Databricks工作区的代码

"resources": [
    {
      "type": "Microsoft.Databricks/workspaces",
      "name": "[parameters('workspaceName')]",
      "location": "[parameters('location')]",
      "apiVersion": "2018-04-01",
      "sku": {
        "name": "[parameters('pricingTier')]"      
      },
      "tags": {
        "ComponentID": "[parameters('tagComponentID')]",
        "Env": "[parameters('tagEnv')]"
      },
      "properties": {
        "ManagedResourceGroupId": "[concat(subscription().id, '/resourceGroups/', variables('managedResourceGroupName'))]"        
      }
    }
  ]
Run Code Online (Sandbox Code Playgroud)

azure azure-resource-manager databricks azure-databricks

3
推荐指数
1
解决办法
5462
查看次数

Delta Lake 表上的 SQL 视图

我需要在 Databricks 中现有的 Delta Lake 表之上创建一个抽象。是否可以在 Spark 中基于 Delta Lake 表制作 SQL Server 类型的 SQL 视图?

apache-spark databricks azure-databricks delta-lake

3
推荐指数
1
解决办法
6805
查看次数

将 DataFrame 写入 Parquet 或 Delta 似乎没有被并行化 - 耗时太长

问题陈述

我已将分区的 CSV 文件读入 Spark 数据帧。

为了利用 Delta Tables 的改进,我试图简单地将它作为 Delta 导出到 Azure Data Lake Storage Gen2 内的目录中。我在 Databricks 笔记本中使用以下代码:

%scala

df_nyc_taxi.write.partitionBy("year", "month").format("delta").save("/mnt/delta/")
Run Code Online (Sandbox Code Playgroud)

整个数据帧大约有 160 GB。

硬件规格

我正在使用具有 12 个内核和 42 GB RAM 的集群运行此代码。

但是看起来整个写入过程是由 Spark/Databricks 顺序处理的,例如非并行方式

在此处输入图片说明

DAG 可视化如下所示:

在此处输入图片说明

总而言之,这将需要 1-2 个小时才能执行。

问题

  • 有没有办法让 Spark 并行写入不同的分区?
  • 问题可能是我试图将增量表直接写入 Azure Data Lake Storage?

scala apache-spark azure-data-lake databricks azure-databricks

3
推荐指数
1
解决办法
1944
查看次数

删除 pyspark 数据框列中的非 ascii 和特殊字符

我正在从大约有 50 列的 csv 文件中读取数据,很少有列(4 到 5)包含带有非 ASCII 字符和特殊字符的文本数据。

df = spark.read.csv(path, header=True, schema=availSchema)
Run Code Online (Sandbox Code Playgroud)

我正在尝试删除所有非 Ascii 和特殊字符并仅保留英文字符,我尝试按如下方式进行

df = df['textcolumn'].str.encode('ascii', 'ignore').str.decode('ascii')
Run Code Online (Sandbox Code Playgroud)

我的列名中没有空格。我收到一个错误

TypeError: 'Column' object is not callable
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<command-1486957561378215> in <module>
----> 1 InvFilteredDF = InvFilteredDF['SearchResultDescription'].str.encode('ascii', 'ignore').str.decode('ascii')

TypeError: 'Column' object is not callable
Run Code Online (Sandbox Code Playgroud)

是否有其他方法可以完成此操作,感谢您对此的任何帮助。

python apache-spark-sql pyspark pyspark-sql azure-databricks

3
推荐指数
2
解决办法
6100
查看次数

无法删除 Azure Databricks 托管资源组

我在资源组中创建了 Azure Databricks。这将创建具有存储帐户的托管资源组。我的资源组还包含其他数据库服务,例如 Cosmos DB 和 SQL Server。

我的工作完成后删除了资源组。不幸的是,托管资源组没有被删除。注意:我没有删除 Databricks 服务,而是删除了资源组本身。

当我尝试手动删除托管资源组时,出现以下错误:

由于名称为“Azure Databricks 创建的系统拒绝分配”的拒绝分配,访问被拒绝

在 IAM 下,我确实看到了Databricks 创建的拒绝分配。因此,无法删除该资源组。

我的资源组已被删除几天了,但托管资源组仍然存在。

如何删除此托管资源组?

Databricks 托管资源组

azure azure-databricks

3
推荐指数
1
解决办法
8265
查看次数

如何使用 PySpark 计算 ADLS 中的目录大小?

我想计算包含子文件夹和子文件的目录(例如 XYZ)大小。我想要所有文件和 XYZ 内所有内容的总大小。

我可以找到特定路径内的所有文件夹。但我想要所有的尺寸在一起。我也看到

display(dbutils.fs.ls("/mnt/datalake/.../XYZ/.../abc.parquet"))

给我 abc 文件的数据大小。但我想要 XYZ 的完整尺寸。

python apache-spark pyspark databricks azure-databricks

3
推荐指数
1
解决办法
1万
查看次数

org.postgresql.util.PSQLException:SSL 错误:从 Azure Databricks 写入 Azure Postgres Citus 时收到致命警报:handshake_failure

我正在尝试将 pyspark 数据帧写入 Azure Postgres Citus(超大规模)。我正在使用最新的 Postgres JDBC 驱动程序,并尝试在 Databricks Runtime 7、6、5 上编写。

df.write.format("jdbc").option("url","jdbc:postgresql://<HOST>:5432/citus?user=citus&password=<PWD>&sslmode=require" ).option("dbTable", table_name).mode(method).save()

这是运行上述命令后得到的结果 org.postgresql.util.PSQLException: SSL error: Received fatal alert: handshake_failure

我已经尝试过 URL 中的不同参数,也尝试过该选项,但到目前为止还没有运气。但是,我可以使用本地计算机连接到此实例,并使用 psycopg2 在 databricks 驱动程序/笔记本上连接到此实例。Azure Postgres Citus 和 Databricks 都位于同一区域,并且 Azure Postgres Citus 是公共的。

azure apache-spark pyspark azure-postgresql azure-databricks

3
推荐指数
1
解决办法
6117
查看次数

停止 hive 的 RetryingHMSHandler 记录到 databricks 集群

我正在使用 azure databricks 5.5 LTS 以及 Spark 2.4.3 和 scala 2.11。几乎每个发送到 databricks 集群的请求都会出现以下错误日志

ERROR RetryingHMSHandler: NoSuchObjectException(message:There is no database named global_temp)
at org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:487)
at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:498)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Run Code Online (Sandbox Code Playgroud)

虽然这不会影响我们正在尝试做的事情的最终结果,但我们的日志不断充满这些内容,并且浏览起来并不是很愉快。我尝试通过将以下属性设置为驱动程序和执行程序来关闭它

log4j.level.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
Run Code Online (Sandbox Code Playgroud)

只是后来才意识到 RetryingHMSHandler 类实际上使用 slf4j 记录器,有没有一种优雅的方法来克服这个问题?

log4j slf4j apache-spark azure-databricks

3
推荐指数
1
解决办法
1603
查看次数