标签: databricks

使用 Databricks 通过代理连接

我想通过需要身份验证的代理使用 databricks-connect。

我正在使用 Linux 操作系统和 Azure Databricks。我已经在我的家庭网络中配置了 databricks-connect。 Databricks-connect 测试正常工作。

相反,在我的办公室里,我需要设置一个代理。此代理使用基本领域身份验证。通常,我会将环境变量 HTTPS_PROXY 设置为http://username:passwd@host:port

但是,databricks-connect 测试向我抛出一个错误,指出代理变量与正则表达式 HOST:POST 匹配。当然环境变量不匹配。删除 username:passwd 会导致来自代理的 407 响应。

有人知道如何使它工作吗?

提前致谢

databricks azure-databricks

6
推荐指数
0
解决办法
634
查看次数

将附加参数传递给 pyspark 中的 foreachBatch

我在 pyspark 结构化流中使用 foreachBatch 使用 JDBC 将每个微批处理写入 SQL Server。我需要对多个表使用相同的过程,并且我想通过为表名添加一个额外的参数来重用相同的编写器函数,但我不确定如何传递表名参数。

这里的示例非常有用,但在 python 示例中,表名是硬编码的,看起来在 scala 示例中他们引用了一个全局变量(?),我想将表名传递给函数。

上面链接的python示例中给出的函数是:

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()
Run Code Online (Sandbox Code Playgroud)

我想使用这样的东西:

def writeToSQLWarehose(df, epochId, tableName):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", tableName) \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()
Run Code Online (Sandbox Code Playgroud)

但我不确定如何通过 foreachBatch 传递附加参数。

apache-spark pyspark databricks spark-structured-streaming

6
推荐指数
1
解决办法
2122
查看次数

从 Scala/Spark 写入 SQL Server 日期时间数据类型

我正在尝试使用与此类似的方法从数据块笔记本中批量插入 SQL Server 表:

批量复制到 Azure SQL 数据库或 SQL Server

这工作正常,直到我尝试写入数据类型为日期时间的列。我试图写入的表具有以下架构:

create table raw.HubDrg_TEST
(
  DrgKey varchar(64) not null,
  LoadDate datetime,
  LoadProcess varchar(255),
  RecordSource varchar(255),
  DrgCode varchar(255)
 )
Run Code Online (Sandbox Code Playgroud)

我的Scala代码如下:

//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load() 

//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "Select …
Run Code Online (Sandbox Code Playgroud)

sql-server scala apache-spark databricks

6
推荐指数
0
解决办法
1290
查看次数

databricks:检查挂载点是否已经挂载

如何在databricks python中安装之前检查安装点是否已经安装?

dbutils.fs.mount
Run Code Online (Sandbox Code Playgroud)

谢谢

python azure databricks azure-databricks

6
推荐指数
3
解决办法
7515
查看次数

在python中安装delta模块的正确方法是什么?

在python中安装delta模块的正确方法是什么?

示例中,他们导入模块

from delta.tables import *

但我没有找到在我的虚拟环境中安装模块的正确方法

目前我正在使用这个火花参数 -

"spark.jars.packages": "io.delta:delta-core_2.11:0.5.0"

pyspark databricks delta-lake

6
推荐指数
3
解决办法
2744
查看次数

如何重命名 Databricks 中的列

如何重命名 Databricks 中的列?

以下不起作用:

ALTER TABLE mySchema.myTable change COLUMN old_name new_name int
Run Code Online (Sandbox Code Playgroud)

它返回错误:

不支持 ALTER TABLE CHANGE COLUMN 将类型为 'IntegerType >(nullable = true)' 的列 'old_name' 更改为类型为 'IntegerType (nullable = true)' 的'new_name';

如果它有所不同,则该表使用的是 Delta Lake,并且它没有按此“old_name”列进行分区或按 z 排序。

databricks delta-lake

6
推荐指数
3
解决办法
9462
查看次数

Databricks 6.1 初始化 Metastore 连接时没有名为 global_temp 的数据库错误

在集群 6.1(包括 Apache Spark 2.4.4、Scala 2.11)(Azure)上初始化 hive Metastore 连接(第一次将数据帧保存为表)时,我可以看到数据库 global_temp 的运行状况检查失败并显示错误:

20/02/18 12:11:17 INFO HiveUtils: Initializing HiveMetastoreConnection version 0.13.0 using file:
...
20/02/18 12:11:21 INFO HiveMetaStore: 0: get_database: global_temp
20/02/18 12:11:21 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_database: global_temp   
20/02/18 12:11:21 ERROR RetryingHMSHandler: NoSuchObjectException(message:There is no database named global_temp)
    at org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:487)
    at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:498)
...
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
...
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

这不会导致 python 脚本失败,但会污染日志。

global_temp 数据库不应该自动创建吗?可以关闭支票吗?还是错误被抑制?

azure databricks azure-databricks

6
推荐指数
0
解决办法
1063
查看次数

在 pyspark 中删除重复项时进行聚合

我想分组聚合一个 pyspark 数据框,同时根据该数据框的另一列删除重复项(保留最后一个值)

总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地保留某一列的一行。

例子

对于下面的数据帧,直接的组聚合将是:

from pyspark.sql import functions

dataframe = spark.createDataFrame(
    [
        (1, "2020-01-01", 1, 1),
        (2, "2020-01-01", 2, 1),
        (3, "2020-01-02", 1, 1),
        (2, "2020-01-02", 1, 1)
    ],
    ("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))

# +---+-------------------+-------+---+
# | id|                 ts|feature| h3|
# +---+-------------------+-------+---+
# |  1|2020-01-01 00:00:00|      1|  1|
# |  2|2020-01-01 00:00:00|      2|  1|
# |  3|2020-01-02 00:00:00|      1|  1|
# |  2|2020-01-02 00:00:00|      1|  1|
# +---+-------------------+-------+---+ …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql pyspark databricks

6
推荐指数
1
解决办法
719
查看次数

Azure Databricks 定价:B2B 订阅与官方页面定价

我从一家公司知道,用于 B2B 非生产订阅的 50,000 个 DBU 可能需要大约 44,000 美元。反过来,在 Databricks 官方定价页面,最优质层的成本为 0.55 美元/DBU(每 50k DBU 27,500 美元)。

您能否解释一下 B2B 订阅 DBU 和官方页面 Data Analytics Pemium SKU DBU 之间的区别?

为什么价格相差如此之大?除了支持/fastrack 之外还有什么(作为 B2B 的一部分)吗?

希望你不需要发布私人信息来回答我的问题。但我需要了解主要原因,以便能够为未来的项目计划成本。

UPD

Databricks B2B 订阅为您提供不同使用层(轻/工程/分析)的选择。相反,每个捆绑包(DBU 量)都有一个选项(价格)。该选项比最昂贵的 Analytics 层要贵得多。

databricks azure-databricks

6
推荐指数
1
解决办法
409
查看次数

在大量分区上处理 upsert 不够快

问题

我们在 ADLS Gen2 之上有一个 Delta Lake 设置,其中包含下表:

  • bronze.DeviceData: 按到达日期划分 ( Partition_Date)
  • silver.DeviceData:按事件日期和时间(Partition_DatePartition_Hour)分区

我们从事件中心摄取大量数据(每天超过 6 亿条记录)到bronze.DeviceData(仅追加)。然后我们以流方式处理新文件,并silver.DeviceData使用 delta MERGE 命令将它们更新插入(见下文)。

到达铜牌表的数据可以包含来自任何银牌分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的>90% 的数据都来自分区Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)。因此,为了更新数据,我们有以下两个 spark 作业:

  • “快速”:处理来自上述三个日期分区的数据。延迟在这里很重要,所以我们优先考虑这些数据
  • “慢”:处理其余部分(什么,但是这三个日期的分区)。延迟并不重要,但它应该在“合理”的时间内(我会说不超过一周)

现在我们来解决这个问题:虽然在“慢”工作中数据量少了很多,但它运行数天只是为了处理一天的慢青铜数据,有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,因此这些合并命令效率低下。

而且,随着时间的推移,这个缓慢的工作会变得越来越慢,因为它接触到的银色分区会增长。

问题

  1. 我们的分区方案和快速/慢速 Spark 作业设置通常是解决这个问题的好方法吗?
  2. 可以做些什么来改进这种设置?我们希望降低缓慢作业的成本和延迟,并找到一种方法,使其随着每天到达的数据量以青铜级而不是银级表的大小而增长

附加信息

  • 我们需要 MERGE 命令,因为某些上游服务可以重新处理历史数据,然后也应该更新 Silver 表
  • 银桌的架构:
CREATE TABLE silver.DeviceData (
  DeviceID LONG NOT NULL, -- the …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark databricks delta-lake azure-data-lake-gen2

6
推荐指数
1
解决办法
441
查看次数