非持久化 RDD 如何导致 RPC 超时?

B. *_*ith 5 scala apache-spark

我有一个非常大的 RDD 正在缓存(它仍然适合内存),但由于它太大,我想尽快取消它。但是,当我调用 unpersist 时,它会导致 RPC 超时错误:

17/11/21 23:25:55 INFO BlockManager: Removing RDD 171
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:135)
        at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1793)
        at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)

17/11/21 23:27:55 WARN BlockManagerMaster: Failed to remove RDD 171 - Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
Run Code Online (Sandbox Code Playgroud)

触发此错误的代码如下所示:

val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist()
Run Code Online (Sandbox Code Playgroud)

取消持久化 RDD 应该是一个相对便宜的操作。非持久化 RDD 如何导致 RPC 超时?

小智 0

unpersist()默认是阻塞调用,因此它会等待RDD的所有块被删除。因此,一旦从驱动程序执行取消持久化 RDD 的指令,它将要求所有执行器取消持久化。驱动程序将等待执行程序的响应,执行程序可能会由于网络问题等多种原因而超时。

为了避免该异常,您可以将 unpersist() 设置为非阻塞,这样您就不会收到此异常。

val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist(false)
Run Code Online (Sandbox Code Playgroud)

在取消持久化为非阻塞时,RDD 只是被标记为已删除,并且可能会在 GC 或 Spark 后台线程取消持久化期间被清除。