我创建了一个 DataFrame,我想将其写入/导出到表中的 Azure DataLake Gen2 旁边(需要为此创建新表)。
将来我还需要使用新的 DataFrame 更新此 Azure DL Gen2 表。
在 Azure Databricks 中,我创建了一个连接 Azure Databricks -> Azure DataLake 来查看我的文件:
感谢如何在 Spark / pyspark 中编写它的帮助。
谢谢你!
当我运行下面的代码时,出现错误java.lang.AssertionError:assertionfailed:Foundduplicaterewriteattributes。在更新我们的 databricks 运行时之前,它运行得很顺利。
top10_df 是列表中具有唯一键的数据的数据框groups。
res_df 是 top10_df 中唯一键与最小和最大日期的聚合。
创建并保存 res_df 后,它会根据组中的唯一键重新连接到 top10_df 中。
groups = ['col1','col2','col3','col4']
min_date_created = fn.min('date_created').alias('min_date_created')
max_date_created = fn.max('date_created').alias('max_date_created')
res_df = (top10_df
.groupBy(groups)
.agg(min_date_created
,max_date_created
)
)
res_df.persist()
print(res_df.count())
score_rank = fn.row_number().over(w.partitionBy(groups).orderBy(fn.desc('score')))
unique_issue_id = fn.row_number().over(w.orderBy(groups))
out_df = (top10_df.alias('t10')
.join(res_df.alias('res'),groups,'left')
.where(fn.col('t10.date_created')==fn.col('res.max_date_created'))
.drop(fn.col('t10.date_created'))
.drop(fn.col('t10.date_updated'))
.withColumn('score_rank',score_rank)
.where(fn.col('score_rank')==1)
.drop('score_rank'
,'latest_revision_complete_hash'
,'latest_revision_durable_hash'
)
.withColumn('unique_issue_id',unique_issue_id)
.withColumnRenamed('res.id','resource_id')
)
out_df.persist()
print(out_df.count())
Run Code Online (Sandbox Code Playgroud) 我遇到了数据块中记录损坏的问题。我们想要对损坏的记录进行计数,并将损坏的记录保存在特定位置作为增量表。为此,我们正在阅读PERMISSIVE本_corrupt_record专栏的使用内容并进行查询。
我们在 Azure Databricks 中将 pyspark 与 Apache Spark 3.0.1 结合使用。
这是我们收到的错误消息:
从 Spark 2.3 开始,当引用的列仅包含内部损坏记录列(默认情况下名为 _corrupt_record)时,不允许从原始 JSON/CSV 文件进行查询。例如:spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()和spark.read.schema(schema).json(file).select("_corrupt_record" )。展示()。
根据此文档,如果要查询列损坏记录,则必须缓存或保存数据。
但我们不想在 ETL 中缓存数据。ETL 用于在同一集群上运行的许多作业,我们可以将 150GB 的大文件作为输入。缓存数据可能会导致集群崩溃。
有没有办法在不缓存数据的情况下查询这些损坏的记录?
#1 将数据保存在 blob 存储上可能是另一种选择,但这听起来开销很大。
#2 我们还尝试使用选项BadRecordsPath:将坏记录保存到 BadRecordsPath 并读回以进行计数,但是没有简单的方法可以知道坏记录文件是否已被写入(以及该文件位于哪个分区)书面)。分区看起来像/20210425T102409/bad_records
在这里查看我的其他问题
#3 另一种方法是从许可读取中减去 dropmalformed 读取。例如:
dataframe_with_corrupt = spark.read.format('csv').option("mode", "PERMISSIVE").load(path)
dataframe_without_corrupt = spark.read.format('csv').option("mode", "DROPMALFORMED").load(path)
corrupt_df = dataframe_with_corrupt.exceptAll(dataframe_without_corrupt)
Run Code Online (Sandbox Code Playgroud)
但我不确定它会比缓存占用更少的内存!
任何建议或意见将不胜感激!提前致谢
我正在运行运行时 8.1(包括 Apache Spark 3.1.1、Scala 2.12),试图让 hyperopt 按定义工作
py4j.Py4JException: Method maxNumConcurrentTasks([]) does not exist
Run Code Online (Sandbox Code Playgroud)
当我尝试
spark_trials = SparkTrials()
Run Code Online (Sandbox Code Playgroud)
我需要做什么特别的事情才能使其正常工作吗?
这是我正在使用的集群
{
"autoscale": {
"min_workers": 1,
"max_workers": 2
},
"cluster_name": "mlops_tiny_ml",
"spark_version": "8.2.x-cpu-ml-scala2.12",
"spark_conf": {},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-west-2b",
"instance_profile_arn": "arn:aws:iam::112437402463:instance-profile/databricks_instance_role_s3",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 3,
"ebs_volume_size": 100
},
"node_type_id": "m4.large",
"driver_node_type_id": "m4.large",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {},
"autotermination_minutes": 120,
"enable_elastic_disk": false,
"cluster_source": "UI",
"init_scripts": [],
"cluster_id": "0xxxxxt404"
}
Run Code Online (Sandbox Code Playgroud)
我试图找出为什么在使用 Databricks Job API 时出现以下错误。
{ "error_code": "INVALID_PARAMETER_VALUE", "message": "集群验证错误:缺少必填字段:settings.cluster_spec.new_cluster.size" }
我做了什么:
{
"new_cluster": {
"spark_version": "7.5.x-scala2.12",
"spark_conf": {
"spark.master": "local[*]",
"spark.databricks.cluster.profile": "singleNode"
},
"azure_attributes": {
"availability": "ON_DEMAND_AZURE",
"first_on_demand": 1,
"spot_bid_max_price": -1
},
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"custom_tags": {
"ResourceClass": "SingleNode"
},
"enable_elastic_disk": true
},
"libraries": [
{
"pypi": {
"package": "koalas==1.5.0"
}
}
],
"notebook_task": {
"notebook_path": "/pathtoNotebook/TheNotebook",
"base_parameters": {
"param1": "test"
}
}, …Run Code Online (Sandbox Code Playgroud) 对 Databricks 中的索引如何工作感到好奇。您能否将分区视为索引,因为它有效地将数据组织在分组的子类别中?
我试图将 parquet 文件转换为 Excel 文件。但是,当我尝试使用 pandas 或openpyxl引擎这样做时,它显示“ Operation not supported”错误。但是,我可以使用databricks 中的openpyxl引擎读取 excel 文件。
在阅读以下代码时,它正在工作:
xlfile = '/dbfs/mnt/raw/BOMFILE.xlsx'
tmp_csv = '/dbfs/mnt/trusted/BOMFILE.csv'
pdf = pd.DataFrame(pd.read_excel(xlfile, engine='openpyxl'))
pdf.to_csv (tmp_csv, index = None, header=True)
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试使用 openpyxl 和 xlswriter 编写相同的内容时,它不起作用:
parq = '/mnt/raw/PRODUCT.parquet'
final = '/dbfs/mnt/trusted/PRODUCT.xlsx'
df = spark.read.format("parquet").option("header", "true").load(parq)
pandas_df = df.toPandas()
pandas_df.to_excel(final, engine='openpyxl')
#pandas_df.to_excel(outfile, engine='xlsxwriter')#, sheet_name=tbl)
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
FileCreateError: [Errno 95] Operation not supported
OSError: [Errno 95] Operation not supported
During handling of the above exception, another exception occurred: …Run Code Online (Sandbox Code Playgroud) 我已经设置了 Amundsen,并且 UI 工作正常。我正在尝试运行其存储库中的示例中给出的示例 Delta Lake 加载程序。
"""
This is a example script for extracting Delta Lake Metadata Results
"""
from pyhocon import ConfigFactory
from pyspark.sql import SparkSession
from databuilder.extractor.delta_lake_metadata_extractor import DeltaLakeMetadataExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://localhost:7687/'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
cluster_key = 'my_delta_environment'
database = 'delta'
# Or …Run Code Online (Sandbox Code Playgroud) 我将以下代码打包到 whl 文件中:
from pkg_resources import resource_filename
def path_to_model(anomaly_dir_name: str, data_path: str):
filepath = resource_filename(anomaly_dir_name, data_path)
return filepath
def read_data(spark) -> DataFrame:
return (spark.read.parquet(str(path_to_model("sampleFolder", "data"))))
Run Code Online (Sandbox Code Playgroud)
我确认whl文件正确包含sampleFolder/data/目录下的镶木地板文件。当我在本地运行它时,它可以工作,但是当我将此 whl 文件上传到 dbfs 并运行时,我收到此错误:
AnalysisException: Path does not exist: dbfs:/databricks/python/lib/python3.7/site-packages/sampleFolder/data;
Run Code Online (Sandbox Code Playgroud)
我确认这个目录实际上不存在: dbfs:/databricks/python 知道这个错误可能是什么吗?
谢谢。
我有一个笔记本,它将处理该文件并以结构化格式创建数据框。现在我需要导入在另一个笔记本中创建的数据框,但问题是在运行笔记本之前我需要验证仅适用于我需要运行的某些场景。
通常要导入所有数据结构,我们使用%run。但就我而言,它应该是 if 子句和 then notebook run 的组合
if "dataset" in path": %run ntbk_path
它给出错误“路径不存在”
if "dataset" in path": dbutils.notebook.run(ntbk_path)
这个我无法获得所有的数据结构。
有人可以帮我解决这个错误吗?
databricks ×10
apache-spark ×6
pyspark ×4
python ×3
delta-lake ×2
api ×1
azure ×1
caching ×1
corrupt ×1
hyperopt ×1
indexing ×1
jobs ×1
pandas ×1
parquet ×1
python-wheel ×1
xlsxwriter ×1