当rdd项很大时,为什么rdd.map(identity).cache会变慢?

Juh*_*uh_ 2 performance caching apache-spark

我发现当.map( identity ).cache在rdd上使用时,如果物品很大,它会变得很慢.虽然它几乎是瞬间的.

注意:这可能与这个问题有关,但在这里我提供了一个非常精确的例子(可以直接在spark-shell中执行):

// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count )                 // around 12 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!
Run Code Online (Sandbox Code Playgroud)

我首先预计是时候创建一个新的rdd(容器)了.但是如果我使用相同大小但内容很少的rdd,执行时间差异很小:

val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count )                 // around 9 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 15 ms
Run Code Online (Sandbox Code Playgroud)

因此,看起来缓存实际上是在复制数据.我认为它也可能会浪费时间序列化它,但我检查了缓存是否与默认的MEMORY_ONLY持久性一起使用:

rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true
Run Code Online (Sandbox Code Playgroud)

=>那么,是缓存复制数据,还是其他什么?

这实际上是我的应用程序的一个主要限制,因为我开始使用类似的设计,rdd = rdd.map(f: Item => Item).cache可以使用任意顺序应用的许多此类函数(我无法事先确定的顺序).

我正在使用Spark 1.6.0

编辑

当我查看spark ui - > stage选项卡 - >最后一个阶段(即4)时,所有任务都具有几乎相同的数据:

  • 持续时间= 3s(它下降到3s,但仍然是2.9太多了: - \)
  • 调度程序10ms
  • 任务反序列化20ms
  • gc 0.1s(所有任务都有,但为什么gc会被触发?)
  • 结果序列化0ms
  • 得到结果0ms
  • 峰值exec mem 0.0B
  • 输入大小7.0MB/125
  • 没有错误

Den*_*Huo 10

缓慢缓存期间jstack运行的进程的一个org.apache.spark.executor.CoarseGrainedExecutorBackend显示如下:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

SizeEstimator有意义的,因为缓存一些东西,表面上是已在内存中的主要成本之一,因为不明物体大小适当的估计可能是相当困难的; 如果你查看visitSingleObject方法,你会发现它很大程度上依赖于反射,调用getClassInfo它来访问运行时类型信息; 不仅遍历完整对象层次结构,而且还检查每个嵌套成员IdentityHashMap以检测哪些引用引用相同的具体对象实例,因此堆栈跟踪在这些IdentityHashMap操作中显示大量时间.

对于示例对象,您基本上将每个项目作为从包装整数到包装整数的映射列表; 据推测,Scala对内部映射的实现也包含一个数组,这解释了visitSingleObject - > List.foreach - > visitSingleObject - > visitSingleObject调用层次结构.在任何情况下,在这种情况下都有很多内部对象要访问,而SizeEstimators为每个被采样的对象设置一个新的IdentityHashMap.

在您衡量的情况下:

profile( rdd.cache.count )
Run Code Online (Sandbox Code Playgroud)

由于RDD已经被成功缓存,所以这并不算作行使缓存逻辑,因此Spark足够聪明,不会重新运行缓存逻辑.实际上,您可以通过直接分析新的RDD创建和缓存来独立于额外的"映射(标识)"转换来确定缓存逻辑的确切成本; 这是我的Spark会话从你的最后几行继续:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms                                                                   
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms                                                                  
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms                                                                  
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms                                                                  
res7: Long = 1000
Run Code Online (Sandbox Code Playgroud)

所以你可以看到,缓慢并不是因为你经历了map转换本身,而是在这种情况下,当每个对象有一些东西时,~6s似乎是计算1000个对象的缓存逻辑的基本成本.喜欢~1,000,000到~10,000,000个内部对象(取决于Map实现的布局方式; ex visitArray顶部堆栈中的额外嵌套跟踪提示HashMap impl具有嵌套数组,这对于每个内部的典型密集线性探测数据结构是有意义的哈希表条目).

对于您的具体用例,如果可能的话,您应该在延迟缓存方面犯错,因为如果您不打算重复使用中间结果进行大量单独的下游转换,那么缓存中间结果的开销并不是一个很好的权衡.但正如您在问题中提到的那样,如果您确实使用一个RDD分支到多个不同的下游转换,那么如果原始转换非常昂贵,您可能确实需要缓存步骤.

解决方法是尝试使内部数据结构更适合于恒定时间计算(例如基元数组),在这里您可以节省大量成本,避免迭代大量的包装器对象并依赖于它们的反射SizeEstimator.

我尝试了类似Array [Array [Int]]的东西,即使仍有非零开销,但对于相似的数据大小,它要好10倍:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms                                                                    
res20: Long = 1000
Run Code Online (Sandbox Code Playgroud)

为了说明任何较高级别对象的反射成本有多糟糕,如果我删除最后一个toArray并且最终每个bigContent都是a scala.collection.immutable.IndexedSeq[Array[Int]],则性能会回到原始IndexSeq[Map[Int,Int]]情况的慢度的2倍以内:

scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms                                                                   
res25: Long = 1000
Run Code Online (Sandbox Code Playgroud)

正如评论部分所讨论的,你也可以考虑使用MEMORY_ONLY_SER StorageLevel,只要有一个高效的序列化器,它很可能比SizeEstimator中使用的递归反射便宜; 要做到这一点,你刚刚替换cache()persist(StorageLevel.MEMORY_ONLY_SER); 正如在另一个问题中提到的那样,cache()在概念上也是如此persist(StorageLevel.MEMORY_ONLY).

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
Run Code Online (Sandbox Code Playgroud)

我实际上在Spark 1.6.1和Spark 2.0.0预览上运行了这一点,其中有关集群配置的所有其他内容完全相同(分别使用Google Cloud Dataproc的"1.0"和"预览"图像版本).不幸的是,MEMORY_ONLY_SER技巧似乎没有帮助Spark 1.6.1:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms                                                                   
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms                                                                   
res21: Long = 1000
Run Code Online (Sandbox Code Playgroud)

但在Spark 2.0.0预览版中,它似乎可以将性能提高10倍:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms                                                                   
res20: Long = 1000
Run Code Online (Sandbox Code Playgroud)

这可能会因您的对象而异; 如果序列化本身并没有使用大量的反射,那么只能预期加速; 如果你能够有效地使用Kryo序列化,那么你可能会看到使用MEMORY_ONLY_SER这些大型对象的改进.