标签: databricks

独立模式下的 Spark 并行性

我正在尝试在我的系统中以独立模式运行 Spark。我的系统当前规格是8核和32GB内存。根据这篇文章,我计算 Spark 配置如下:

spark.driver.memory 2g
spark.executor.cores 3
spark.executor.instances 2
spark.executor.memory 20g
maximizeResourceAllocation TRUE
Run Code Online (Sandbox Code Playgroud)

我在我的 jupyter 笔记本中创建了 Spark 上下文,如下所示,并通过此检查并行度级别

sc = SparkContext()
sc.defaultParallelism
Run Code Online (Sandbox Code Playgroud)

默认并行度为 8。我的问题是,为什么它给我 8,即使我提到了 2 个核心?如果它没有给我系统的实际并行度,那么如何获得实际的并行度?

谢谢你!

apache-spark pyspark databricks

9
推荐指数
1
解决办法
7049
查看次数

如何在不手动配置的情况下使用databricks-cli

我想使用 databricks cli:

databricks clusters list
Run Code Online (Sandbox Code Playgroud)

但这需要一个手动步骤,需要与用户进行交互工作:

databricks configure --token
Run Code Online (Sandbox Code Playgroud)

有没有一种方法可以在无需手动干预的情况下使用 databricks cli,以便可以将其作为 ci/cd 管道的一部分运行?

bash azure databricks databricks-cli

9
推荐指数
3
解决办法
9068
查看次数

SPARK SQl中的DATEDIFF

我是Spark SQL的新手。我们正在将数据从SQL Server迁移到Databricks。我正在使用SPARK SQL。您能否建议以下日期函数在SPARK sql中实现以下功能?我可以看到datediff在spark sql中仅给出几天。

DATEDIFF(年,StartDate,EndDate)DATEDIFF(月,StartDate,EndDate)DATEDIFF(四分之一,StartDate,EndDate)

datediff apache-spark-sql databricks

9
推荐指数
1
解决办法
1万
查看次数

使用 dbutils 在 Databricks 中上传后从目录中删除文件

StackOverflow 的一位非常聪明的人帮助我将文件从 Databricks 复制到目录: copyfiles

我使用相同的原理在复制文件后删除文件,如链接所示:

for i in range (0, len(files)):
  file = files[i].name
  if now in file:  
    dbutils.fs.rm(files[i].path,'/mnt/adls2/demo/target/' + file)
    print ('copied     ' + file)
  else:
    print ('not copied ' + file)
Run Code Online (Sandbox Code Playgroud)

但是,我收到错误:

TypeError: '/mnt/adls2/demo/target/' 的类型错误 - 需要类 bool 。

有人可以让我知道如何解决这个问题吗?我认为在最初使用命令复制文件后删除文件很简单dbutils.fs.rm

python databricks azure-databricks

9
推荐指数
2
解决办法
6万
查看次数

尝试访问 Azure Databricks 中的 Azure DBFS 文件系统时出现装载错误

我能够建立与 Databricks FileStore 的连接DBFS并访问文件存储。

使用 Pyspark 读取、写入和转换数据是可能的,但是当我尝试使用本地 Python API(例如pathlibOS模块)时,我无法通过 DBFS 文件系统的第一级

我可以使用一个神奇的命令:

%fs ls dbfs:\mnt\my_fs\...哪个工作完美并列出所有子目录?

但如果我这样做,它会作为返回值os.listdir('\dbfs\mnt\my_fs\')返回['mount.err']

我已经在新集群上进行了测试,结果是相同的

我在 Databricks Runtine 版本 6.1 和 Apache Spark 2.4.4 上使用 Python

有谁能提供建议吗?

编辑 :

连接脚本:

我使用 Databricks CLI 库来存储根据 databricks 文档格式化的凭据:

 def initialise_connection(secrets_func):
  configs = secrets_func()
  # Check if the mount exists
  bMountExists = False
  for item in dbutils.fs.ls("/mnt/"):
      if str(item.name) == r"WFM/":
          bMountExists = True
      # drop if exists to refresh credentials …
Run Code Online (Sandbox Code Playgroud)

python azure databricks azure-databricks

9
推荐指数
1
解决办法
2万
查看次数

在高并发集群上的 Databricks 中获取笔记本内的用户名?

在尝试在高并发集群上获取用户数据时,我遇到了这个问题。我正在使用下面的命令来获取用户详细信息

dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('用户')

以下是运行的错误日志。任何帮助将非常感激。

Py4JError: An error occurred while calling o475.tags. Trace:
py4j.security.Py4JSecurityException: Method public scala.collection.immutable.Map com.databricks.backend.common.rpc.CommandContext.tags() is not whitelisted on class class com.databricks.backend.common.rpc.CommandContext
    at py4j.security.WhitelistingPy4JSecurityManager.checkCall(WhitelistingPy4JSecurityManager.java:409)
    at py4j.Gateway.invoke(Gateway.java:294)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

databricks

9
推荐指数
3
解决办法
1万
查看次数

databricks cli:收到 b'Bad 请求错误

我第一次尝试使用 Databricks CLI。每当我尝试使用 cli 进行某些操作时,它都会向我显示消息:“错误:b'Bad Request'”
这对于我能够进行身份验证的任何基于 cli 的命令都是相同的
(尝试使用错误的令牌并收到身份验证错误)
请找到调试跟踪如下:
命令:databricks fs ls --debug

result:
HTTP debugging enabled
send: b'GET /api/2.0/dbfs/list?path=dbfs%3A%2F HTTP/1.1\r\nHost: xxx-xxxxxxxxxxxxxxxx.xx.azuredatabricks.net\r\nuser-agent: databricks-cli-0.14.0-fs-ls-a2695361-282f-11eb-83c1-34f39aa616fb\r\nAccept-Encoding: gzip, deflate\r\nAccept: */*\r\nConnection: keep-alive\r\nAuthorization: Bearer \x16\r\nContent-Type: text/json\r\n\r\n'
reply: 'HTTP/1.1 400 Bad Request\r\n'
header: content-length: 11
header: content-type: text/plain<br>
header: date: Mon, 16 Nov 2020 17:17:42 GMT
header: server: databricks
header: connection: close
Error: b'Bad Request'
Run Code Online (Sandbox Code Playgroud)

command-line-interface databricks azure-databricks

9
推荐指数
1
解决办法
7381
查看次数

为什么 Spark 会为一项操作创建多个作业?

我注意到,当仅使用一个操作启动这堆代码时,我启动了三个作业。

\n
from typing import List\nfrom pyspark.sql import DataFrame\nfrom pyspark.sql.types import StructType, StructField, StringType\nfrom pyspark.sql.functions import avg\n\ndata: List = [("Diamant_1A", "TopDiamant", "300", "rouge"),\n    ("Diamant_2B", "Diamants pour toujours", "45", "jaune"),\n    ("Diamant_3C", "Mes diamants pr\xc3\xa9f\xc3\xa9r\xc3\xa9s", "78", "rouge"),\n    ("Diamant_4D", "Diamants que j\'aime", "90", "jaune"),\n    ("Diamant_5E", "TopDiamant", "89", "bleu")\n  ]\n\nschema: StructType = StructType([ \\\n    StructField("reference", StringType(), True), \\\n    StructField("marque", StringType(), True), \\\n    StructField("prix", StringType(), True), \\\n    StructField("couleur", StringType(), True)\n  ])\n\ndataframe: DataFrame = spark.createDataFrame(data=data,schema=schema)\n\ndataframe_filtree:DataFrame = dataframe.filter("prix > 50")\n\ndataframe_filtree.show()\n
Run Code Online (Sandbox Code Playgroud)\n

根据我的理解,我应该只得到一个。一项操作对应一项作业。\n我正在使用 Databricks。这可能是问题所在。我有 2 个问题:

\n
    \n …

python apache-spark pyspark databricks

9
推荐指数
2
解决办法
2309
查看次数

使用 PySpark 训练多个词嵌入模型陷入困境

很高兴终于发布我的第一个问题,但如果我不清楚或违反标准礼仪,请轻推我。我真诚地感谢我能得到的任何帮助。

我正在尝试使用PySpark(在 Databricks 中)并行训练许多语料库(每个语料库对应于不同的作者)的嵌入。每位作者的每个语料库大小不超过 1GB

corpi/df 的形式为:

+----------------+------------------------------------------------------------+
|          author|                                                      corpus|
+----------------+------------------------------------------------------------+
|            john| [["hello"], ["these", "are", "john's", "thoughts"]]        |
|           steve| [["hello"], ["these", "are", "steve's", "thoughts"]]       |
|          markus| [["hello"], ["these", "are", "markus's", "thoughts"]]       |
+----------------+------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

过去的尝试:

  • 定义了一个 UDF,它使用 的gensimWord2Vec 函数应用于上面的每一行。这适用于小型公司,但无论我将其制作多大,我都会遇到 OutOfMemory 错误spark.executor.memory,这会导致我失去执行者、无限期挂起等......
  • 现在,我尝试将explode上面的数据框转换为每行都是一个句子。然后我想我会做一个df.groupBy('author'),我会定义一个 UDF,它实现 Spark 的Word2Vec模型来训练组并保存模型。不幸的是,即使只使用几个作者、重新分区等......,它也不会让我在不无限期挂起的情况下分解数据框,所以我坚持使用上面显示的数据框。

簇 :

  • 1 个驱动程序 = 56GB 内存,16 核
  • 1 - 8 个工作人员(自动缩放)56GB 内存,16 核

配置(受此处启发):

yarn.nodemanager.pmem-check-enabled false
spark.databricks.delta.preview.enabled …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark databricks

9
推荐指数
0
解决办法
1660
查看次数

增量表合并多列

我有一个表,其主键为多个列,因此我需要对多个列执行合并逻辑


DeltaTable.forPath(spark, "path")
  .as("data")
  .merge(
    finalDf1.as("updates"),
    "data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
  .whenMatched
  .updateAll()
  .whenNotMatched
  .insertAll()
  .execute()

Run Code Online (Sandbox Code Playgroud)

当我检查数据计数时,它没有按预期更新。

有人可以帮我解决这个问题吗?

databricks azure-databricks delta-lake

9
推荐指数
1
解决办法
9517
查看次数