Abh*_*hek 1 apache-spark pyspark databricks
我正在对 Databricks 中托管的巨大 Delta 表的数据框进行非常简单的操作。我面临的问题是,运行几个小时后,代码失败,并显示错误“作业因阶段失败而中止:59014 个任务的序列化结果的总大小 (4.0 GiB) 大于spark.driver.maxResultSize 4.0 GiB” ”。
我正在执行的任务是,我读取数据框中的增量表,添加一个新列用于创建存储桶(20个存储桶),并以覆盖模式保存表,将存储桶添加为另一个分区(已经有3个分区,这个新分区)列将是第四个分区)。因此,我没有在应用程序代码中执行任何会导致大量数据返回驱动程序的操作。下面给出的是示例代码
bucket_number = 20
inputDataframe = spark.read.table("huge_delta_table")
inputDataframe = inputDataframe.withColumn("bucket_key", (translate( substring(col("some_column"), 0, 16), "abcdefghijklmnopqrstuvwxyz", "01234567890123456789012345").cast(LongType()) %buckets_number)+1)
inputDatafrme.write.format("delta")
input.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("existing_partition_column1","existing_partition_column2","existing_partition_column3","bucket_key") \
.saveAsTable("huge_delta_table")
Run Code Online (Sandbox Code Playgroud)
我想知道,是否是因为大量的任务导致 Spark 的内部结果元数据变得巨大(当出于协调目的而传回驱动程序时)?
正如这篇SO post的答案所讨论的,当您处理大量任务(您有 59k)时,您可能会遇到这个问题,因为每个任务都会向驱动程序发送一些数据。引用那个答案:
Spark 会尝试将数据发送回驱动程序,而不仅仅是当您显式调用收集时。如果您使用累加器,它还会发送回每个任务的累加器结果、广播连接的数据以及有关每个任务的一些小状态数据。如果您有很多分区(根据我的经验超过 20k),您有时会看到此错误
我们可以看一下源码,看看这个错误是在哪里触发的。错误是在的方法中的这一行触发的。TaskSetManager.scala
canFetchMoreResults
幸运的是,该canFetchMoreResults
方法仅在 1 个位置被调用:在驱动程序上运行的enqueueSuccessfulTask 中。TaskResultGetter
无需将整个函数的内容粘贴到此处,我们可以找到一条有趣的信息:
// Set the task result size in the accumulator updates received from the executors.
// We need to do this here on the driver because if we did this on the executors then
// we would have to serialize the result again after updating the size.
result.accumUpdates = result.accumUpdates.map { a =>
if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
val acc = a.asInstanceOf[LongAccumulator]
assert(acc.sum == 0L, "task result size should not have been set on the executors")
acc.setValue(size.toLong)
acc
} else {
a
}
}
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,对于每个任务,我们都会向驱动程序发送一些数据。
您的问题是您的设置有太多任务spark.driver.maxResultSize
。您的分区大小是多少?通常,100MB 左右的分区大小是一个健康的大小。尝试以下操作:
spark.files.maxPartitionBytes
参数设置得更大一些(例如,默认大小的两倍),因为您似乎没有对它们进行任何疯狂的计算。spark.driver.maxResultSize
大于您拥有的大小,或者将其设置为 0 以实现无限大小。请注意,您可能需要增加驱动程序的内存以避免 OOM 错误。 归档时间: |
|
查看次数: |
1313 次 |
最近记录: |