我正在 Azure Databricks DBR 7.3 LTS、spark 3.0.1、scala 2.12 在 Standard_E4as_v4(32.0 GB 内存、4 个内核、1 DBU)VM 的(20 到 35)个工作人员集群上运行以下代码,并且类型为 Standard_DS5_v2 驱动程序( 56.0 GB 内存、16 核、3 DBU)
目标是处理约 5.5 TB 的数据
我面临以下异常:“org.apache.spark.SparkException:由于阶段失败而中止作业:1165个任务的序列化结果的总大小(4.0 GiB)大于spark.driver.maxResultSize 4.0 GiB”处理1163后57071,正在处理 148.4 GiB 的数据,用时 6.1 分钟
我不收集或传输数据到驱动程序,分区数据是否会导致此问题?如果是这种情况:
代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)
val jsonDF = spark.read.json("/mnt/myfile")
val res = jsonDF
.withColumn("row", row_number.over(w))
.where($"row" === 1)
.drop("row")
res.write.json("/mnt/myfile/spark_output")
Run Code Online (Sandbox Code Playgroud)
然后我只尝试再次加载和写入数据而不进行转换,并遇到同样的问题,代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql databricks azure-databricks
我正在尝试使用PySpark处理包含带有动态键的结构列的 json 文件。
结构列的架构如下所示:
{
"UUID_KEY": {
"time": STRING
"amount": INTEGER
}
}
Run Code Online (Sandbox Code Playgroud)
数据如下:
| ID | json_列 |
|---|---|
| 1 | “{1:{金额:1,时间:2},2:{金额:10,时间:5}}” |
| 2 | “{3:{金额:1,时间:2},4:{金额:10,时间:5}” |
目前,我将结构列作为字符串,因为通过指定/推断模式加载 JSON 不起作用,因为第一层的键是随机生成的,并且数据太多。第二层始终相同,它包含amount和time。
有没有办法在不知道第一层的键的情况下将此 JSON 字符串平铺到amount和列中?time
我在databricks中创建了一个sql视图。是否可以将视图分配给 python 数据框?
我有一个数据框,并希望删除所有括号,并替换为两个连字符.
之前:
+------------+
| dob_concat|
+------------+
|[1983][6][3]|
+------------+
Run Code Online (Sandbox Code Playgroud)
后:
+------------+
| dob_concat |
+------------+
| 1983-6-3 |
+------------+
Run Code Online (Sandbox Code Playgroud) 已使用 databricks 中的 pyspark 在 Azure Blob 存储中创建了按日期分区的 Parquet 文件,但在一个日期文件夹中收到了如此多的文件,例如 500 个文件。我需要使用 PySpark 减少文件数量,例如一个日期文件夹中的 10 或 15 个文件。
df.write.format("parquet").mode("overwrite").partitionBy("Date").save(
"/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)
我尝试过coalesce:
df.write.format("parquet").mode("overwrite").partitionBy("Date").coalesce(15).save(
"/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)
但会抛出错误:
AttributeError:“DataFrameWriter”对象没有属性“coalesce”
请帮忙。