我想通过需要身份验证的代理使用 databricks-connect。
我正在使用 Linux 操作系统和 Azure Databricks。我已经在我的家庭网络中配置了 databricks-connect。 Databricks-connect 测试正常工作。
相反,在我的办公室里,我需要设置一个代理。此代理使用基本领域身份验证。通常,我会将环境变量 HTTPS_PROXY 设置为http://username:passwd@host:port。
但是,databricks-connect 测试向我抛出一个错误,指出代理变量与正则表达式 HOST:POST 匹配。当然环境变量不匹配。删除 username:passwd 会导致来自代理的 407 响应。
有人知道如何使它工作吗?
提前致谢
我在 pyspark 结构化流中使用 foreachBatch 使用 JDBC 将每个微批处理写入 SQL Server。我需要对多个表使用相同的过程,并且我想通过为表名添加一个额外的参数来重用相同的编写器函数,但我不确定如何传递表名参数。
这里的示例非常有用,但在 python 示例中,表名是硬编码的,看起来在 scala 示例中他们引用了一个全局变量(?),我想将表名传递给函数。
上面链接的python示例中给出的函数是:
def writeToSQLWarehose(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
Run Code Online (Sandbox Code Playgroud)
我想使用这样的东西:
def writeToSQLWarehose(df, epochId, tableName):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", tableName) \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
Run Code Online (Sandbox Code Playgroud)
但我不确定如何通过 foreachBatch 传递附加参数。
我正在尝试使用与此类似的方法从数据块笔记本中批量插入 SQL Server 表:
批量复制到 Azure SQL 数据库或 SQL Server
这工作正常,直到我尝试写入数据类型为日期时间的列。我试图写入的表具有以下架构:
create table raw.HubDrg_TEST
(
DrgKey varchar(64) not null,
LoadDate datetime,
LoadProcess varchar(255),
RecordSource varchar(255),
DrgCode varchar(255)
)
Run Code Online (Sandbox Code Playgroud)
我的Scala代码如下:
//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", sqlDwUrlSmall)
.option("tempDir", tempDir)
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load()
//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", sqlDwUrlSmall)
.option("tempDir", tempDir)
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "Select …Run Code Online (Sandbox Code Playgroud) 在python中安装delta模块的正确方法是什么?
在示例中,他们导入模块
from delta.tables import *
但我没有找到在我的虚拟环境中安装模块的正确方法
目前我正在使用这个火花参数 -
"spark.jars.packages": "io.delta:delta-core_2.11:0.5.0"
如何重命名 Databricks 中的列?
以下不起作用:
ALTER TABLE mySchema.myTable change COLUMN old_name new_name int
Run Code Online (Sandbox Code Playgroud)
它返回错误:
不支持 ALTER TABLE CHANGE COLUMN 将类型为 'IntegerType >(nullable = true)' 的列 'old_name' 更改为类型为 'IntegerType (nullable = true)' 的'new_name';
如果它有所不同,则该表使用的是 Delta Lake,并且它没有按此“old_name”列进行分区或按 z 排序。
在集群 6.1(包括 Apache Spark 2.4.4、Scala 2.11)(Azure)上初始化 hive Metastore 连接(第一次将数据帧保存为表)时,我可以看到数据库 global_temp 的运行状况检查失败并显示错误:
20/02/18 12:11:17 INFO HiveUtils: Initializing HiveMetastoreConnection version 0.13.0 using file:
...
20/02/18 12:11:21 INFO HiveMetaStore: 0: get_database: global_temp
20/02/18 12:11:21 INFO audit: ugi=root ip=unknown-ip-addr cmd=get_database: global_temp
20/02/18 12:11:21 ERROR RetryingHMSHandler: NoSuchObjectException(message:There is no database named global_temp)
at org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:487)
at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:498)
...
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
...
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
这不会导致 python 脚本失败,但会污染日志。
global_temp 数据库不应该自动创建吗?可以关闭支票吗?还是错误被抑制?
我想分组聚合一个 pyspark 数据框,同时根据该数据框的另一列删除重复项(保留最后一个值)。
总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地保留某一列的一行。
对于下面的数据帧,直接的组聚合将是:
from pyspark.sql import functions
dataframe = spark.createDataFrame(
[
(1, "2020-01-01", 1, 1),
(2, "2020-01-01", 2, 1),
(3, "2020-01-02", 1, 1),
(2, "2020-01-02", 1, 1)
],
("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))
# +---+-------------------+-------+---+
# | id| ts|feature| h3|
# +---+-------------------+-------+---+
# | 1|2020-01-01 00:00:00| 1| 1|
# | 2|2020-01-01 00:00:00| 2| 1|
# | 3|2020-01-02 00:00:00| 1| 1|
# | 2|2020-01-02 00:00:00| 1| 1|
# +---+-------------------+-------+---+ …Run Code Online (Sandbox Code Playgroud) 我从一家公司知道,用于 B2B 非生产订阅的 50,000 个 DBU 可能需要大约 44,000 美元。反过来,在 Databricks 官方定价页面,最优质层的成本为 0.55 美元/DBU(每 50k DBU 27,500 美元)。
您能否解释一下 B2B 订阅 DBU 和官方页面 Data Analytics Pemium SKU DBU 之间的区别?
为什么价格相差如此之大?除了支持/fastrack 之外还有什么(作为 B2B 的一部分)吗?
希望你不需要发布私人信息来回答我的问题。但我需要了解主要原因,以便能够为未来的项目计划成本。
Databricks B2B 订阅不为您提供不同使用层(轻/工程/分析)的选择。相反,每个捆绑包(DBU 量)都有一个选项(价格)。该选项比最昂贵的 Analytics 层要贵得多。
问题
我们在 ADLS Gen2 之上有一个 Delta Lake 设置,其中包含下表:
bronze.DeviceData: 按到达日期划分 ( Partition_Date)silver.DeviceData:按事件日期和时间(Partition_Date和Partition_Hour)分区我们从事件中心摄取大量数据(每天超过 6 亿条记录)到bronze.DeviceData(仅追加)。然后我们以流方式处理新文件,并silver.DeviceData使用 delta MERGE 命令将它们更新插入(见下文)。
到达铜牌表的数据可以包含来自任何银牌分区的数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的>90% 的数据都来自分区Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)。因此,为了更新数据,我们有以下两个 spark 作业:
现在我们来解决这个问题:虽然在“慢”工作中数据量少了很多,但它运行数天只是为了处理一天的慢青铜数据,有一个大集群。原因很简单:它必须读取和更新许多银分区(有时> 1000 个日期分区),并且由于更新很小但日期分区可能是千兆字节,因此这些合并命令效率低下。
而且,随着时间的推移,这个缓慢的工作会变得越来越慢,因为它接触到的银色分区会增长。
问题
附加信息
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the …Run Code Online (Sandbox Code Playgroud) scala apache-spark databricks delta-lake azure-data-lake-gen2
databricks ×10
apache-spark ×4
delta-lake ×3
pyspark ×3
azure ×2
scala ×2
dataframe ×1
python ×1
sql-server ×1