我是 Python 和 Spark 世界的新手。我正在尝试构建一个 pyspark 代码以从 Databricks 发送电子邮件以及来自安装点位置的附件。我使用下面的代码来实现相同的 -
import smtplib
from pathlib import Path
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
from email import encoders
def send_mail(send_from = <from_email>, send_to = <to_email>, subject = "Test", message = "Test", files=["/mnt/<Mounted Point Directory>/"],
server="<SMTP Host>", port=<SMTP Port>, username='<SMTP Username>', password='<SMTP Password>',
use_tls=True):
msg = MIMEMultipart()
msg['From'] = send_from
msg['To'] = COMMASPACE.join(send_to)
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = subject
msg.attach(MIMEText(message))
for path …Run Code Online (Sandbox Code Playgroud) 我试图将 parquet 文件转换为 Excel 文件。但是,当我尝试使用 pandas 或openpyxl引擎这样做时,它显示“ Operation not supported”错误。但是,我可以使用databricks 中的openpyxl引擎读取 excel 文件。
在阅读以下代码时,它正在工作:
xlfile = '/dbfs/mnt/raw/BOMFILE.xlsx'
tmp_csv = '/dbfs/mnt/trusted/BOMFILE.csv'
pdf = pd.DataFrame(pd.read_excel(xlfile, engine='openpyxl'))
pdf.to_csv (tmp_csv, index = None, header=True)
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试使用 openpyxl 和 xlswriter 编写相同的内容时,它不起作用:
parq = '/mnt/raw/PRODUCT.parquet'
final = '/dbfs/mnt/trusted/PRODUCT.xlsx'
df = spark.read.format("parquet").option("header", "true").load(parq)
pandas_df = df.toPandas()
pandas_df.to_excel(final, engine='openpyxl')
#pandas_df.to_excel(outfile, engine='xlsxwriter')#, sheet_name=tbl)
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
FileCreateError: [Errno 95] Operation not supported
OSError: [Errno 95] Operation not supported
During handling of the above exception, another exception occurred: …Run Code Online (Sandbox Code Playgroud) 我有一个笔记本,它将处理该文件并以结构化格式创建数据框。现在我需要导入在另一个笔记本中创建的数据框,但问题是在运行笔记本之前我需要验证仅适用于我需要运行的某些场景。
通常要导入所有数据结构,我们使用%run。但就我而言,它应该是 if 子句和 then notebook run 的组合
if "dataset" in path": %run ntbk_path
它给出错误“路径不存在”
if "dataset" in path": dbutils.notebook.run(ntbk_path)
这个我无法获得所有的数据结构。
有人可以帮我解决这个错误吗?
在 Azure Databricks 中,我想使用 python/pyspark 从多个笔记本同时写入同一组 parquet 文件。我对目标文件进行了分区,因此分区是不相交/独立写入的,这是根据databricks 文档支持的。
但是,我的集群日志中不断出现错误,并且并发写入操作之一失败:
Py4JJavaError: An error occurred while calling o1033.save.
: org.apache.spark.SparkException: Job aborted.
...
Caused by: org.apache.hadoop.fs.PathIOException: `<filePath>/_SUCCESS': Input/output error: Parallel access to the create path detected. Failing request to honor single writer semantics
Run Code Online (Sandbox Code Playgroud)
这是写入 parquet 文件的基本路径。
为什么会发生这种情况?_SUCCESS 文件的用途是什么?我可以通过某种方式禁用它们来避免此问题吗?
我意识到 Databricks 集群有超时,这意味着 N 分钟后它将关闭集群。这是一个示例。
尽管这个功能很好,但它并不是我们所需要的。我们的团队工作日从上午 8 点工作到下午 6 点。我们希望集群能够在上午 8 点自动启动,在工作时间内保持“始终开启”状态,然后在下午 6 点之后超时。合理?
问:这可能吗?
我正在尝试使用 Autoloader 加载多种类型的 csv 文件,它当前将我放入一个大镶木地板表中的所有 csv 合并,我想要的是为每种类型的 schema/csv_file 创建镶木地板表
#Streaming files/ waiting a file to be dropped
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("delimiter", "~|~") \
.option("cloudFiles.inferColumnTypes","true") \
.option("cloudFiles.schemaLocation", pathCheckpoint) \
.load(sourcePath) \
.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", pathCheckpoint) \
.start(pathResult)
Run Code Online (Sandbox Code Playgroud)
我正在尝试在 databrick python 笔记本上运行以下脚本:
pip install presidio-image-redactor
pip install pytesseract
python -m spacy download en_core_web_lg
from PIL import Image
from presidio_image_redactor import ImageRedactorEngine
import pytesseract
image = Image.open("images/ImageData.PNG")
engine = ImageRedactorEngine()
redacted_image = engine.redact(image, (255, 192, 203))
Run Code Online (Sandbox Code Playgroud)
运行最后一行后,我收到以下错误:
TesseractNotFoundError:tesseract 未安装或不在您的路径中。
我错过了什么吗?
我将我们的 databricks 集群更新到 Azure Databricks 上的 DBR 9.1 LTS,但是当我尝试使用 Databricks-connect 在 VS Code 中运行我经常使用的包时,会出现错误,而之前的集群则不会。之前的集群在 DBR 8.3 上运行。我也更新了该软件包以与新的 DBR 集群兼容。maven 坐标为
com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.3.0。当我直接在 Databricks 笔记本中运行以下脚本时,它可以工作,但是当我使用 Databricks-connect 运行它时,出现以下错误。
# com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12:4.3.0
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.dbutils import DBUtils
spark = SparkSession.builder.appName("local").getOrCreate()
dbutils = DBUtils(spark)
cosmosEndpoint = ######################
cosmosMasterKey = ######################
cosmosDatabaseName = ######################
cosmosContainerName = "test"
cfg = {
"spark.cosmos.accountEndpoint": cosmosEndpoint,
"spark.cosmos.accountKey": cosmosMasterKey,
"spark.cosmos.database": cosmosDatabaseName,
"spark.cosmos.container": cosmosContainerName,
}
# Configure …Run Code Online (Sandbox Code Playgroud) python pyspark databricks azure-databricks databricks-connect
错误信息 -job failed with error message The output of the notebook is too large. Cause: rpc response (of 20972488 bytes) exceeds limit of 20971520 bytes
详细信息:我们使用 databricks 笔记本来运行该作业。作业正在作业集群上运行。这是一项流媒体工作。作业开始失败并出现上述错误。
我们的作业中没有任何 display()、show()、print()、explain 方法。
我们在作业中也没有使用awaitAnyTermination 方法。
我们还尝试将“spark.databricks.driver.disableScalaOutput true”添加到作业中,但仍然不起作用。作业因同样的错误而失败。
我们已遵循本文档中提到的所有步骤 - https://learn.microsoft.com/en-us/azure/databricks/kb/jobs/job-cluster-limit-nb-output
我们是否有任何选项可以解决此问题或准确找出哪些命令输出导致其超出 20MB 限制。
我有以下客户表 ddl
CREATE TABLE customer (
name string NOT NULL
,id string NOT NULL DEFAULT 'No ID'
,age INT
) using delta
Run Code Online (Sandbox Code Playgroud)
default在 deltalake 中创建表时出现关键字问题
谁能帮我,如何在 ddl 中为 deltalake 定义默认值?
apache-spark-sql pyspark databricks azure-databricks delta-lake
azure-databricks ×10
databricks ×10
pyspark ×6
python ×3
apache-spark ×2
delta-lake ×1
email ×1
pandas ×1
parquet ×1
scala ×1
tesseract ×1
xlsxwriter ×1