Car*_*onp 0 pyspark pyspark-sql azure-cosmosdb azure-databricks
我在 Databricks Notebook 上编写了以下 PySpark 代码,它使用以下代码行成功地将结果从 sparkSQL 保存到 Azure Cosmos DB:
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)
完整代码如下:
test = spark.sql("""SELECT
Sales.CustomerID AS pattersonID1
,Sales.InvoiceNumber AS myinvoicenr1
FROM Sales
limit 4""")
## my personal cosmos DB
writeConfig3 = {
"Endpoint": "https://<cosmosdb-account>.documents.azure.com:443/",
"Masterkey": "<key>==",
"Database": "mydatabase",
"Collection": "mycontainer",
"Upsert": "true"
}
df = test.coalesce(1)
df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)
使用上面的代码我已经成功地写入了我的 Cosmos DB 数据库(mydatabase)和集合(mycontainer)

当我尝试通过使用以下更改 SparkSQL 来覆盖容器时(只需将 pattersonID1 更改为 pattersonID2,并将 myinvoicenr1 更改为 myinvoicenr2
test = spark.sql("""SELECT
Sales.CustomerID AS pattersonID2
,Sales.InvoiceNumber AS myinvoicenr2
FROM Sales
limit 4""")
Run Code Online (Sandbox Code Playgroud)
相反,使用新查询覆盖/更新集合 Cosmos DB 会按如下方式附加容器:
并且仍然在集合中保留原始查询:
有没有办法完全覆盖或更新 Cosmos DB?
您的问题是文档具有唯一性id(您从未指定的内容,因此作为 guid 为您自动生成)。当您编写新文档时,您刚刚将一个非id、非唯一属性 、pattersonID1、重命名为pattersonID2,并且正如预期的那样,它只是创建了一个新文档。没有办法知道这个新文档是否与原始文档相关,因为它是一个全新的文档,具有自己的一组属性。
您可以通过查询(或阅读)、修改和替换现有文档来更新现有文档。或者您可以选择查询旧文档并删除它们(一个一个,或者一个分区内的一批删除事务,通过存储过程)。最后,您可以删除并重新创建一个容器,这将删除当前存储在其中的所有文档。
| 归档时间: |
|
| 查看次数: |
2402 次 |
| 最近记录: |