我正在尝试从 Databricks 连接到 azure 表存储。我似乎找不到任何不进入 blob 容器的资源,但我尝试修改表的资源。
spark.conf.set(
"fs.azure.account.key.accountname.table.core.windows.net",
"accountkey")
blobDirectPath = "wasbs://accountname.table.core.windows.net/TableName"
df = spark.read.parquet(blobDirectPath)
Run Code Online (Sandbox Code Playgroud)
我现在假设表格是镶木地板文件。我现在收到此代码的身份验证错误。
寻找一种使用 Azure 文件 SDK 将文件上传到我的 azure databricks blob 存储的方法
我使用此页面中的功能尝试了很多事情
但没有任何效果。我不明白为什么
例子:
file_service = FileService(account_name='MYSECRETNAME', account_key='mySECRETkey')
generator = file_service.list_directories_and_files('MYSECRETNAME/test') #listing file in folder /test, working well
for file_or_dir in generator:
print(file_or_dir.name)
file_service.get_file_to_path('MYSECRETNAME','test/tables/input/referentials/','test.xlsx','/dbfs/FileStore/test6.xlsx')
Run Code Online (Sandbox Code Playgroud)
其中 test.xlsx = 我的 azure 文件中的文件名
/dbfs/FileStore/test6.xlsx => 在我的 dbfs 系统中上传文件的路径
我有错误消息:
异常=指定的资源名称包含无效字符
尝试更改名称但似乎不起作用
编辑:我什至不确定该功能是否正在执行我想要的操作。从天蓝色文件加载文件的最佳方法是什么?
我正在 databricks 中尝试这个。请让我知道需要导入的 pyspark 库以及在 Azure databricks pyspark 中获取以下输出的代码
示例:- 输入数据框:-
| column1 | column2 | column3 | column4 |
| a | bbbbb | cc | >dddddddd |
| >aaaaaaaaaaaaaa | bb | c | dddd |
| aa | >bbbbbbbbbbbb | >ccccccc | ddddd |
| aaaaa | bbbb | ccc | d |
Run Code Online (Sandbox Code Playgroud)
输出数据帧:-
| column | maxLength |
| column1 | 14 |
| column2 | 12 |
| column3 | 7 |
| column4 | …Run Code Online (Sandbox Code Playgroud) 我必须合并许多 Spark DataFrame。合并后,我想在具有相同名称的多个列之间执行合并。
我能够根据这个问题创建一个最小的例子。
但是,我需要一段更通用的代码来支持:一组要合并的变量(在示例中set_vars = set(('var1','var2')))和多个连接键(在示例中join_keys = set(('id')))。
是否有更简洁(更通用)的方法来获得此结果pyspark?
df1 = spark.createDataFrame([
( 1, None , "aa"),
( 2 , "a", None ),
( 3 , "b", None),
( 4 , "h", None),],
"id int, var1 string, var2 string",
)
df2 = spark.createDataFrame([
( 1, "f" , "Ba"),
( 2 , "a", "bb" ),
( 3 , "b", None),],
"id int, var1 string, var2 string",
)
df1 = df1.alias("df1")
df2 = …Run Code Online (Sandbox Code Playgroud) 我看到很多例子需要在 rdd.map 上使用 lambda 。
只是想知道我们是否可以做如下的事情:
df.withColumn('newcol',(lambda x: x['col1'] + x['col2'])).show()
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 PySpark 从 Databricks Hive_Metastore 读取数据。在下面的屏幕截图中,我尝试读取位于数据库中的名为“trips”的表nyctaxi。
通常,如果该表位于 AzureSQL 服务器上,我将使用如下代码:
df = spark.read.format("jdbc")\
.option("url", jdbcUrl)\
.option("dbtable", tableName)\
.load()
Run Code Online (Sandbox Code Playgroud)
或者,如果该表位于 ADLS 中,我将使用类似于以下内容的代码:
df = spark.read.csv("adl://mylake.azuredatalakestore.net/tableName.csv",header=True)
Run Code Online (Sandbox Code Playgroud)
有人可以告诉我如何使用 PySpark 从下面的 Databricks 数据库中读取表格:
附加的屏幕截图我也有帮助
好吧,我刚刚意识到我认为我应该问如何从“samples”meta_store 中读取表格。
无论如何,我希望帮助您从nyctaxi数据库中读取“trips”表。
我必须在 Databricks 中安装 pyodbc 模块。我曾尝试使用此命令 ( pip install pyodbc) 但由于以下错误而失败。
我正在尝试使用 databricks cli 并调用 databricks configure 这就是我从 cmd 中执行的操作
somepath>databricks configure --token
Databricks Host (should begin with https://): my_https_address
Token: my_token
Run Code Online (Sandbox Code Playgroud)
我想使用 R 调用相同的命令。所以我做了:
tool.control <- c('databricks configure --token'
,'my_https_address'
,'my_token')
shell(tool.control)
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
Error in system(command, as.integer(flag), f, stdout, stderr, timeout) :
character string expected as first argument
Run Code Online (Sandbox Code Playgroud)
我该如何纠正?
编辑:尝试评论中的建议后,我收到此错误:
Databricks Host (should begin with https://): Aborted!
'https:' is not recognized as an internal or external command,
operable program or batch file.
'my_token' is not recognized as an internal or …Run Code Online (Sandbox Code Playgroud) 我在 Databricks Notebook 上编写了以下 PySpark 代码,它使用以下代码行成功地将结果从 sparkSQL 保存到 Azure Cosmos DB:
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)
完整代码如下:
test = spark.sql("""SELECT
Sales.CustomerID AS pattersonID1
,Sales.InvoiceNumber AS myinvoicenr1
FROM Sales
limit 4""")
## my personal cosmos DB
writeConfig3 = {
"Endpoint": "https://<cosmosdb-account>.documents.azure.com:443/",
"Masterkey": "<key>==",
"Database": "mydatabase",
"Collection": "mycontainer",
"Upsert": "true"
}
df = test.coalesce(1)
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)
使用上面的代码我已经成功地写入了我的 Cosmos DB 数据库(mydatabase)和集合(mycontainer)

当我尝试通过使用以下更改 SparkSQL 来覆盖容器时(只需将 pattersonID1 更改为 pattersonID2,并将 myinvoicenr1 更改为 myinvoicenr2
test = spark.sql("""SELECT
Sales.CustomerID AS pattersonID2
,Sales.InvoiceNumber AS myinvoicenr2
FROM Sales
limit 4""")
Run Code Online (Sandbox Code Playgroud)
相反,使用新查询覆盖/更新集合 Cosmos DB 会按如下方式附加容器:
并且仍然在集合中保留原始查询: …
我正在 Azure Databricks DBR 7.3 LTS、spark 3.0.1、scala 2.12 在 Standard_E4as_v4(32.0 GB 内存、4 个内核、1 DBU)VM 的(20 到 35)个工作人员集群上运行以下代码,并且类型为 Standard_DS5_v2 驱动程序( 56.0 GB 内存、16 核、3 DBU)
目标是处理约 5.5 TB 的数据
我面临以下异常:“org.apache.spark.SparkException:由于阶段失败而中止作业:1165个任务的序列化结果的总大小(4.0 GiB)大于spark.driver.maxResultSize 4.0 GiB”处理1163后57071,正在处理 148.4 GiB 的数据,用时 6.1 分钟
我不收集或传输数据到驱动程序,分区数据是否会导致此问题?如果是这种情况:
代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)
val jsonDF = spark.read.json("/mnt/myfile")
val res = jsonDF
.withColumn("row", row_number.over(w))
.where($"row" === 1)
.drop("row")
res.write.json("/mnt/myfile/spark_output")
Run Code Online (Sandbox Code Playgroud)
然后我只尝试再次加载和写入数据而不进行转换,并遇到同样的问题,代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql databricks azure-databricks
我已将 Azure Databricks 群集从运行时 5.5LTS 更新到 7.3LTS。现在我在 VSCode 中调试时遇到错误。我已经更新了我的 Anaconda 连接,如下所示:
> conda create --name dbconnect python=3.7
> conda activate dbconnect
> pip uninstall pyspark
> pip install -U databricks-connect==7.3.*
> databricks-connect configure
> databricks-connect test
Run Code Online (Sandbox Code Playgroud)
到目前为止一切顺利,但现在我正在尝试调试以下内容
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
setting = spark.conf.get("spark.master")
if "local" in setting:
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark.sparkContext)
Run Code Online (Sandbox Code Playgroud)
在 上dbutils = DBUtils(spark.sparkContext),它抛出异常
发生异常:AttributeError“SparkContext”对象没有属性“conf”
我尝试过创建conf
from pyspark.dbutils import DBUtils
import pyspark
conf = pyspark.SparkConf()
pyspark.SparkContext.getOrCreate(conf=conf)
dbutils = DBUtils(spark.sparkContext) …Run Code Online (Sandbox Code Playgroud) azure-databricks ×11
pyspark ×7
python ×5
databricks ×3
apache-spark ×2
azure ×1
azure-files ×1
coalesce ×1
command-line ×1
pyodbc ×1
pyspark-sql ×1
python-3.x ×1
r ×1
scala ×1