标签: databricks

Databricks SparkException超过spark.driver.maxResultSize

我正在 Azure Databricks DBR 7.3 LTS、spark 3.0.1、scala 2.12 在 Standard_E4as_v4(32.0 GB 内存、4 个内核、1 DBU)VM 的(20 到 35)个工作人员集群上运行以下代码,并且类型为 Standard_DS5_v2 驱动程序( 56.0 GB 内存、16 核、3 DBU)

目标是处理约 5.5 TB 的数据

我面临以下异常:“org.apache.spark.SparkException:由于阶段失败而中止作业:1165个任务的序列化结果的总大小(4.0 GiB)大于spark.driver.maxResultSize 4.0 GiB”处理1163后57071,正在处理 148.4 GiB 的数据,用时 6.1 分钟

我不收集或传输数据到驱动程序,分区数据是否会导致此问题?如果是这种情况:

  • 有没有更好的分区方式?
  • 如何解决这个问题?

代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)

val jsonDF = spark.read.json("/mnt/myfile")

val res = jsonDF
      .withColumn("row", row_number.over(w))
      .where($"row" === 1)
      .drop("row")

res.write.json("/mnt/myfile/spark_output")
Run Code Online (Sandbox Code Playgroud)

然后我只尝试再次加载和写入数据而不进行转换,并遇到同样的问题,代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql databricks azure-databricks

0
推荐指数
1
解决办法
4292
查看次数

在 PySpark 中使用动态键展平嵌套 JSON 结构

我正在尝试使用PySpark处理包含带有动态键的结构列的 json 文件。

结构列的架构如下所示:

{
  "UUID_KEY": {
     "time": STRING
     "amount": INTEGER
  }
}
Run Code Online (Sandbox Code Playgroud)

数据如下:

ID json_列
1 “{1:{金额:1,时间:2},2:{金额:10,时间:5}}”
2 “{3:{金额:1,时间:2},4:{金额:10,时间:5}”

目前,我将结构列作为字符串,因为通过指定/推断模式加载 JSON 不起作用因为第一层的键是随机生成的,并且数据太多。第二层始终相同,它包含amounttime

有没有办法在不知道第一层的键的情况下将此 JSON 字符串平铺到amount和列中?time

json apache-spark apache-spark-sql pyspark databricks

0
推荐指数
1
解决办法
718
查看次数

Databricks:将 sql 视图转换为 python 数据框

我在databricks中创建了一个sql视图。是否可以将视图分配给 python 数据框?

python sql dataframe databricks

-1
推荐指数
1
解决办法
1217
查看次数

使用PySpark删除和替换字符

我有一个数据框,并希望删除所有括号,并替换为两个连字符.

之前:

+------------+
|  dob_concat|
+------------+
|[1983][6][3]|
+------------+
Run Code Online (Sandbox Code Playgroud)

后:

+------------+
| dob_concat |
+------------+
| 1983-6-3   |
+------------+
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark databricks

-2
推荐指数
1
解决办法
3363
查看次数

减少 pyspark 中的 parquet 文件数量

已使用 databricks 中的 pyspark 在 Azure Blob 存储中创建了按日期分区的 Parquet 文件,但在一个日期文件夹中收到了如此多的文件,例如 500 个文件。我需要使用 PySpark 减少文件数量,例如一个日期文件夹中的 10 或 15 个文件。

df.write.format("parquet").mode("overwrite").partitionBy("Date").save(
    "/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)

我尝试过coalesce

df.write.format("parquet").mode("overwrite").partitionBy("Date").coalesce(15).save(
    "/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)

但会抛出错误:

AttributeError:“DataFrameWriter”对象没有属性“coalesce”

请帮忙。

apache-spark pyspark azure-blob-storage databricks

-2
推荐指数
1
解决办法
1004
查看次数