我正在尝试在我的系统中以独立模式运行 Spark。我的系统当前规格是8核和32GB内存。根据这篇文章,我计算 Spark 配置如下:
spark.driver.memory 2g
spark.executor.cores 3
spark.executor.instances 2
spark.executor.memory 20g
maximizeResourceAllocation TRUE
Run Code Online (Sandbox Code Playgroud)
我在我的 jupyter 笔记本中创建了 Spark 上下文,如下所示,并通过此检查并行度级别
sc = SparkContext()
sc.defaultParallelism
Run Code Online (Sandbox Code Playgroud)
默认并行度为 8。我的问题是,为什么它给我 8,即使我提到了 2 个核心?如果它没有给我系统的实际并行度,那么如何获得实际的并行度?
谢谢你!
我想使用 databricks cli:
databricks clusters list
Run Code Online (Sandbox Code Playgroud)
但这需要一个手动步骤,需要与用户进行交互工作:
databricks configure --token
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以在无需手动干预的情况下使用 databricks cli,以便可以将其作为 ci/cd 管道的一部分运行?
我是Spark SQL的新手。我们正在将数据从SQL Server迁移到Databricks。我正在使用SPARK SQL。您能否建议以下日期函数在SPARK sql中实现以下功能?我可以看到datediff在spark sql中仅给出几天。
DATEDIFF(年,StartDate,EndDate)DATEDIFF(月,StartDate,EndDate)DATEDIFF(四分之一,StartDate,EndDate)
StackOverflow 的一位非常聪明的人帮助我将文件从 Databricks 复制到目录: copyfiles
我使用相同的原理在复制文件后删除文件,如链接所示:
for i in range (0, len(files)):
file = files[i].name
if now in file:
dbutils.fs.rm(files[i].path,'/mnt/adls2/demo/target/' + file)
print ('copied ' + file)
else:
print ('not copied ' + file)
Run Code Online (Sandbox Code Playgroud)
但是,我收到错误:
TypeError: '/mnt/adls2/demo/target/' 的类型错误 - 需要类 bool 。
有人可以让我知道如何解决这个问题吗?我认为在最初使用命令复制文件后删除文件很简单dbutils.fs.rm
我能够建立与 Databricks FileStore 的连接DBFS并访问文件存储。
使用 Pyspark 读取、写入和转换数据是可能的,但是当我尝试使用本地 Python API(例如pathlib或OS模块)时,我无法通过 DBFS 文件系统的第一级
我可以使用一个神奇的命令:
%fs ls dbfs:\mnt\my_fs\...哪个工作完美并列出所有子目录?
但如果我这样做,它会作为返回值os.listdir('\dbfs\mnt\my_fs\')返回['mount.err']
我已经在新集群上进行了测试,结果是相同的
我在 Databricks Runtine 版本 6.1 和 Apache Spark 2.4.4 上使用 Python
有谁能提供建议吗?
连接脚本:
我使用 Databricks CLI 库来存储根据 databricks 文档格式化的凭据:
def initialise_connection(secrets_func):
configs = secrets_func()
# Check if the mount exists
bMountExists = False
for item in dbutils.fs.ls("/mnt/"):
if str(item.name) == r"WFM/":
bMountExists = True
# drop if exists to refresh credentials …Run Code Online (Sandbox Code Playgroud) 在尝试在高并发集群上获取用户数据时,我遇到了这个问题。我正在使用下面的命令来获取用户详细信息
dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('用户')
以下是运行的错误日志。任何帮助将非常感激。
Py4JError: An error occurred while calling o475.tags. Trace:
py4j.security.Py4JSecurityException: Method public scala.collection.immutable.Map com.databricks.backend.common.rpc.CommandContext.tags() is not whitelisted on class class com.databricks.backend.common.rpc.CommandContext
at py4j.security.WhitelistingPy4JSecurityManager.checkCall(WhitelistingPy4JSecurityManager.java:409)
at py4j.Gateway.invoke(Gateway.java:294)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud) 我第一次尝试使用 Databricks CLI。每当我尝试使用 cli 进行某些操作时,它都会向我显示消息:“错误:b'Bad Request'”
这对于我能够进行身份验证的任何基于 cli 的命令都是相同的
(尝试使用错误的令牌并收到身份验证错误)
请找到调试跟踪如下:
命令:databricks fs ls --debug
result:
HTTP debugging enabled
send: b'GET /api/2.0/dbfs/list?path=dbfs%3A%2F HTTP/1.1\r\nHost: xxx-xxxxxxxxxxxxxxxx.xx.azuredatabricks.net\r\nuser-agent: databricks-cli-0.14.0-fs-ls-a2695361-282f-11eb-83c1-34f39aa616fb\r\nAccept-Encoding: gzip, deflate\r\nAccept: */*\r\nConnection: keep-alive\r\nAuthorization: Bearer \x16\r\nContent-Type: text/json\r\n\r\n'
reply: 'HTTP/1.1 400 Bad Request\r\n'
header: content-length: 11
header: content-type: text/plain<br>
header: date: Mon, 16 Nov 2020 17:17:42 GMT
header: server: databricks
header: connection: close
Error: b'Bad Request'
Run Code Online (Sandbox Code Playgroud) 我注意到,当仅使用一个操作启动这堆代码时,我启动了三个作业。
\nfrom typing import List\nfrom pyspark.sql import DataFrame\nfrom pyspark.sql.types import StructType, StructField, StringType\nfrom pyspark.sql.functions import avg\n\ndata: List = [("Diamant_1A", "TopDiamant", "300", "rouge"),\n ("Diamant_2B", "Diamants pour toujours", "45", "jaune"),\n ("Diamant_3C", "Mes diamants pr\xc3\xa9f\xc3\xa9r\xc3\xa9s", "78", "rouge"),\n ("Diamant_4D", "Diamants que j\'aime", "90", "jaune"),\n ("Diamant_5E", "TopDiamant", "89", "bleu")\n ]\n\nschema: StructType = StructType([ \\\n StructField("reference", StringType(), True), \\\n StructField("marque", StringType(), True), \\\n StructField("prix", StringType(), True), \\\n StructField("couleur", StringType(), True)\n ])\n\ndataframe: DataFrame = spark.createDataFrame(data=data,schema=schema)\n\ndataframe_filtree:DataFrame = dataframe.filter("prix > 50")\n\ndataframe_filtree.show()\nRun Code Online (Sandbox Code Playgroud)\n根据我的理解,我应该只得到一个。一项操作对应一项作业。\n我正在使用 Databricks。这可能是问题所在。我有 2 个问题:
\n很高兴终于发布我的第一个问题,但如果我不清楚或违反标准礼仪,请轻推我。我真诚地感谢我能得到的任何帮助。
我正在尝试使用PySpark(在 Databricks 中)并行训练许多语料库(每个语料库对应于不同的作者)的嵌入。每位作者的每个语料库大小不超过 1GB
corpi/df 的形式为:
+----------------+------------------------------------------------------------+
| author| corpus|
+----------------+------------------------------------------------------------+
| john| [["hello"], ["these", "are", "john's", "thoughts"]] |
| steve| [["hello"], ["these", "are", "steve's", "thoughts"]] |
| markus| [["hello"], ["these", "are", "markus's", "thoughts"]] |
+----------------+------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
过去的尝试:
gensimWord2Vec 函数应用于上面的每一行。这适用于小型公司,但无论我将其制作多大,我都会遇到 OutOfMemory 错误spark.executor.memory,这会导致我失去执行者、无限期挂起等......explode上面的数据框转换为每行都是一个句子。然后我想我会做一个df.groupBy('author'),我会定义一个 UDF,它实现 Spark 的Word2Vec模型来训练组并保存模型。不幸的是,即使只使用几个作者、重新分区等......,它也不会让我在不无限期挂起的情况下分解数据框,所以我坚持使用上面显示的数据框。簇 :
配置(受此处启发):
yarn.nodemanager.pmem-check-enabled false
spark.databricks.delta.preview.enabled …Run Code Online (Sandbox Code Playgroud) 我有一个表,其主键为多个列,因此我需要对多个列执行合并逻辑
DeltaTable.forPath(spark, "path")
.as("data")
.merge(
finalDf1.as("updates"),
"data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()
Run Code Online (Sandbox Code Playgroud)
当我检查数据计数时,它没有按预期更新。
有人可以帮我解决这个问题吗?
databricks ×10
python ×4
apache-spark ×3
pyspark ×3
azure ×2
bash ×1
datediff ×1
delta-lake ×1