标签: azure-databricks

从 Azure databricks 连接到 Azure 表存储

我正在尝试从 Databricks 连接到 azure 表存储。我似乎找不到任何不进入 blob 容器的资源,但我尝试修改表的资源。

spark.conf.set(
  "fs.azure.account.key.accountname.table.core.windows.net",
  "accountkey")

blobDirectPath = "wasbs://accountname.table.core.windows.net/TableName"

df = spark.read.parquet(blobDirectPath)
Run Code Online (Sandbox Code Playgroud)

我现在假设表格是镶木地板文件。我现在收到此代码的身份验证错误。

python azure-table-storage pyspark azure-databricks

1
推荐指数
1
解决办法
4121
查看次数

将文件从 Azure 文件加载到 Azure Databricks

寻找一种使用 Azure 文件 SDK 将文件上传到我的 azure databricks blob 存储的方法

我使用此页面中的功能尝试了很多事情

但没有任何效果。我不明白为什么

例子:

file_service = FileService(account_name='MYSECRETNAME', account_key='mySECRETkey')

generator = file_service.list_directories_and_files('MYSECRETNAME/test') #listing file in folder /test, working well
for file_or_dir in generator:
    print(file_or_dir.name)

file_service.get_file_to_path('MYSECRETNAME','test/tables/input/referentials/','test.xlsx','/dbfs/FileStore/test6.xlsx')
Run Code Online (Sandbox Code Playgroud)

其中 test.xlsx = 我的 azure 文件中的文件名

/dbfs/FileStore/test6.xlsx => 在我的 dbfs 系统中上传文件的路径

我有错误消息:

异常=指定的资源名称包含无效字符

尝试更改名称但似乎不起作用

编辑:我什至不确定该功能是否正在执行我想要的操作。从天蓝色文件加载文件的最佳方法是什么?

python azure azure-storage azure-files azure-databricks

1
推荐指数
1
解决办法
5352
查看次数

pyspark 数据框中每列的最大字符串长度

我正在 databricks 中尝试这个。请让我知道需要导入的 pyspark 库以及在 Azure databricks pyspark 中获取以下输出的代码

示例:- 输入数据框:-

|     column1     |    column2    | column3  |  column4  |

| a               | bbbbb         | cc       | >dddddddd |
| >aaaaaaaaaaaaaa | bb            | c        | dddd      |
| aa              | >bbbbbbbbbbbb | >ccccccc | ddddd     |
| aaaaa           | bbbb          | ccc      | d         |
Run Code Online (Sandbox Code Playgroud)

输出数据帧:-

| column  | maxLength |

| column1 |        14 |
| column2 |        12 |
| column3 |         7 |
| column4 | …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark azure-databricks

1
推荐指数
1
解决办法
1万
查看次数

join pyspark 中多列的通用合并

我必须合并许多 Spark DataFrame。合并后,我想在具有相同名称的多个列之间执行合并。

我能够根据这个问题创建一个最小的例子。

但是,我需要一段更通用的代码来支持:一组要合并的变量(在示例中set_vars = set(('var1','var2')))和多个连接键(在示例中join_keys = set(('id')))。

是否有更简洁(更通用)的方法来获得此结果pyspark

df1 =  spark.createDataFrame([ 
        ( 1, None ,  "aa"),
        ( 2 , "a", None ),
        ( 3 , "b",  None),
        ( 4 , "h",  None),],
        "id int, var1 string, var2 string",
       )

df2 =  spark.createDataFrame([ 
        ( 1, "f" ,  "Ba"),
        ( 2 , "a", "bb" ),
        ( 3 , "b",  None),],
        "id int, var1 string, var2 string",
       )

df1 = df1.alias("df1")
df2 = …
Run Code Online (Sandbox Code Playgroud)

python coalesce pyspark azure-databricks

1
推荐指数
1
解决办法
6654
查看次数

Spark Dataframe lambda 直接在数据帧上

我看到很多例子需要在 rdd.map 上使用 lambda 。
只是想知道我们是否可以做如下的事情:

df.withColumn('newcol',(lambda x: x['col1'] + x['col2'])).show()
Run Code Online (Sandbox Code Playgroud)

pyspark azure-databricks

1
推荐指数
1
解决办法
5516
查看次数

使用 PySpark 从 Databricks 数据库 (hive_metastore ) 读取/提取数据

我正在尝试使用 PySpark 从 Databricks Hive_Metastore 读取数据。在下面的屏幕截图中,我尝试读取位于数据库中的名为“trips”的表nyctaxi

通常,如果该表位于 AzureSQL 服务器上,我将使用如下代码:

df = spark.read.format("jdbc")\
    .option("url", jdbcUrl)\
    .option("dbtable", tableName)\
    .load()
Run Code Online (Sandbox Code Playgroud)

或者,如果该表位于 ADLS 中,我将使用类似于以下内容的代码:

df = spark.read.csv("adl://mylake.azuredatalakestore.net/tableName.csv",header=True)
Run Code Online (Sandbox Code Playgroud)

有人可以告诉我如何使用 PySpark 从下面的 Databricks 数据库中读取表格:

在此输入图像描述

附加的屏幕截图我也有帮助

在此输入图像描述

好吧,我刚刚意识到我认为我应该问如何从“samples”meta_store 中读取表格。

无论如何,我希望帮助您从nyctaxi数据库中读取“trips”表。

python apache-spark-sql pyspark azure-databricks

1
推荐指数
1
解决办法
3536
查看次数

如何在 Databricks 中安装 PYODBC

我必须在 Databricks 中安装 pyodbc 模块。我曾尝试使用此命令 ( pip install pyodbc) 但由于以下错误而失败。

错误信息

pyodbc python-3.x databricks azure-databricks

0
推荐指数
2
解决办法
1万
查看次数

使用 cmd 和 R 配置数据块

我正在尝试使用 databricks cli 并调用 databricks configure 这就是我从 cmd 中执行的操作

  somepath>databricks configure --token
  Databricks Host (should begin with https://): my_https_address 
  Token: my_token
Run Code Online (Sandbox Code Playgroud)

我想使用 R 调用相同的命令。所以我做了:

  tool.control <- c('databricks configure --token'
                    ,'my_https_address'
                    ,'my_token')

 shell(tool.control)
Run Code Online (Sandbox Code Playgroud)

我收到以下错误

  Error in system(command, as.integer(flag), f, stdout, stderr, timeout) : 
  character string expected as first argument
Run Code Online (Sandbox Code Playgroud)

我该如何纠正?

编辑:尝试评论中的建议后,我收到此错误:

Databricks Host (should begin with https://): Aborted!
'https:' is not recognized as an internal or external command,
 operable program or batch file.
 'my_token' is not recognized as an internal or …
Run Code Online (Sandbox Code Playgroud)

command-line r databricks azure-databricks

0
推荐指数
1
解决办法
2687
查看次数

如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合

我在 Databricks Notebook 上编写了以下 PySpark 代码,它使用以下代码行成功地将结果从 sparkSQL 保存到 Azure Cosmos DB:

df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)

完整代码如下:

test = spark.sql("""SELECT
  Sales.CustomerID AS pattersonID1
 ,Sales.InvoiceNumber AS myinvoicenr1
FROM Sales
limit 4""")


## my personal cosmos DB
writeConfig3 = {
    "Endpoint": "https://<cosmosdb-account>.documents.azure.com:443/",
    "Masterkey": "<key>==",
    "Database": "mydatabase",
    "Collection": "mycontainer",
    "Upsert": "true"
}

df = test.coalesce(1)

df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)

使用上面的代码我已经成功地写入了我的 Cosmos DB 数据库(mydatabase)和集合(mycontainer) 在此处输入图片说明

当我尝试通过使用以下更改 SparkSQL 来覆盖容器时(只需将 pattersonID1 更改为 pattersonID2,并将 myinvoicenr1 更改为 myinvoicenr2

test = spark.sql("""SELECT
  Sales.CustomerID AS pattersonID2
 ,Sales.InvoiceNumber AS myinvoicenr2
FROM Sales
limit 4""")
Run Code Online (Sandbox Code Playgroud)

相反,使用新查询覆盖/更新集合 Cosmos DB 会按如下方式附加容器:

在此处输入图片说明

并且仍然在集合中保留原始查询: …

pyspark pyspark-sql azure-cosmosdb azure-databricks

0
推荐指数
1
解决办法
2402
查看次数

Databricks SparkException超过spark.driver.maxResultSize

我正在 Azure Databricks DBR 7.3 LTS、spark 3.0.1、scala 2.12 在 Standard_E4as_v4(32.0 GB 内存、4 个内核、1 DBU)VM 的(20 到 35)个工作人员集群上运行以下代码,并且类型为 Standard_DS5_v2 驱动程序( 56.0 GB 内存、16 核、3 DBU)

目标是处理约 5.5 TB 的数据

我面临以下异常:“org.apache.spark.SparkException:由于阶段失败而中止作业:1165个任务的序列化结果的总大小(4.0 GiB)大于spark.driver.maxResultSize 4.0 GiB”处理1163后57071,正在处理 148.4 GiB 的数据,用时 6.1 分钟

我不收集或传输数据到驱动程序,分区数据是否会导致此问题?如果是这种情况:

  • 有没有更好的分区方式?
  • 如何解决这个问题?

代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)

val jsonDF = spark.read.json("/mnt/myfile")

val res = jsonDF
      .withColumn("row", row_number.over(w))
      .where($"row" === 1)
      .drop("row")

res.write.json("/mnt/myfile/spark_output")
Run Code Online (Sandbox Code Playgroud)

然后我只尝试再次加载和写入数据而不进行转换,并遇到同样的问题,代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql databricks azure-databricks

0
推荐指数
1
解决办法
4292
查看次数

升级集群的 Databricks 运行时后调试 PySpark 时出错

我已将 Azure Databricks 群集从运行时 5.5LTS 更新到 7.3LTS。现在我在 VSCode 中调试时遇到错误。我已经更新了我的 Anaconda 连接,如下所示:

> conda create --name dbconnect python=3.7
> conda activate dbconnect
> pip uninstall pyspark
> pip install -U databricks-connect==7.3.*
> databricks-connect configure
> databricks-connect test
Run Code Online (Sandbox Code Playgroud)

到目前为止一切顺利,但现在我正在尝试调试以下内容

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
setting = spark.conf.get("spark.master")

if "local" in setting:
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark.sparkContext)
Run Code Online (Sandbox Code Playgroud)

在 上dbutils = DBUtils(spark.sparkContext),它抛出异常

发生异常:AttributeError“SparkContext”对象没有属性“conf”

我尝试过创建conf

from pyspark.dbutils import DBUtils
import pyspark
conf = pyspark.SparkConf()
pyspark.SparkContext.getOrCreate(conf=conf)
dbutils = DBUtils(spark.sparkContext) …
Run Code Online (Sandbox Code Playgroud)

python pyspark azure-databricks databricks-connect

0
推荐指数
1
解决办法
313
查看次数