Spark - 使用堆外内存

maz*_*cha 3 apache-spark

当 时spark.memory.offheap.enabled=true,Spark 可以利用堆外内存进行洗牌和缓存 ( StorageLevel.OFF_HEAP)。堆外内存可以用来存储广播变量吗?如何?

m_v*_*uri 7

简而言之,,您不能用于StorageLevel.OFF_HEAP广播变量。

要了解原因,让我们查看该方法的源代码SparkContext.broadcast(...)

/**
  * Broadcast a read-only variable to the cluster, returning a
  * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions ... 
  */
 def broadcast[T: ClassTag](value: T): Broadcast[T] = {
   :
   val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
   :
   bc
 }
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,broadcastManager.newBroadcast(...)创建对象的Broadcast是该方法的返回类型。

现在,让我们更深入地研究并检查newBroadcast()

def newBroadcast(value_ : T, isLocal: Boolean): Broadcast[T] = {
  broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,broadcastManager有一个名为 的组件broadcastFactory,并使用抽象工厂设计模式将广播变量的创建委托给其相关工厂。

另请注意,它BroadcastManager会跟踪id每个broadcast变量的唯一性,并且每个新的广播变量都会递增该变量。

目前spark中可以初始化的只有一种BroadcastFactory,那就是TorrentBroadcastFactory. 这可以在的初始化代码中看到BroadcastManager

// Called by SparkContext or Executor before using Broadcast
private def initialize() {
  :
  broadcastFactory = new TorrentBroadcastFactory
  :
}
Run Code Online (Sandbox Code Playgroud)

引用源码TorrentBroadcastFactory

使用类似 BitTorrent 的协议将广播数据分布式传输到执行器的广播实现

这个特定的工厂使用TorrentBroadcast。这个类的描述非常丰富。

驱动程序将序列化对象分成小块,并将这些块存储在驱动程序的 BlockManager 中。

在每个执行器上,执行器首先尝试从其 BlockManager 中获取对象。如果不存在,则它会使用远程获取从驱动程序和/或其他执行器(如果可用)获取小块。一旦它获取了块,它就会将块放入自己的 BlockManager 中,以供其他执行器从中获取。这可以防止驱动程序成为发送多个广播数据副本(每个执行程序一个)的瓶颈。

读取类的writeBlock函数,我们可以看到该广播的TorrentBroadcast硬编码选项。StorageLevel.MEMORY_AND_DISK_SER

  /**
   * Divide the object into multiple blocks and put those blocks in the block manager.
   *
   * @param value the object to divide
   * @return number of blocks this broadcast variable is divided into
   */
  private def writeBlocks(value: T): Int = {
    import StorageLevel._
    :
    :
    if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
      throw new SparkException(s"Failed to store $pieceId of $broadcastId " + s"in local BlockManager")
    }
    :
    :
Run Code Online (Sandbox Code Playgroud)

因此,由于此代码使用 的硬编码值StorageLevel.MEMORY_AND_DISK_SER,因此我们不能用于StorageLevel.OFF_HEAP广播变量。