当 时spark.memory.offheap.enabled=true,Spark 可以利用堆外内存进行洗牌和缓存 ( StorageLevel.OFF_HEAP)。堆外内存可以用来存储广播变量吗?如何?
简而言之,不,您不能用于StorageLevel.OFF_HEAP广播变量。
要了解原因,让我们查看该方法的源代码SparkContext.broadcast(...)。
/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions ...
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
:
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
:
bc
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,broadcastManager.newBroadcast(...)创建对象的Broadcast是该方法的返回类型。
现在,让我们更深入地研究并检查newBroadcast()。
def newBroadcast(value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,broadcastManager有一个名为 的组件broadcastFactory,并使用抽象工厂设计模式将广播变量的创建委托给其相关工厂。
另请注意,它BroadcastManager会跟踪id每个broadcast变量的唯一性,并且每个新的广播变量都会递增该变量。
目前spark中可以初始化的只有一种BroadcastFactory,那就是TorrentBroadcastFactory. 这可以在的初始化代码中看到BroadcastManager。
// Called by SparkContext or Executor before using Broadcast
private def initialize() {
:
broadcastFactory = new TorrentBroadcastFactory
:
}
Run Code Online (Sandbox Code Playgroud)
引用源码TorrentBroadcastFactory
使用类似 BitTorrent 的协议将广播数据分布式传输到执行器的广播实现
这个特定的工厂使用TorrentBroadcast。这个类的描述非常丰富。
驱动程序将序列化对象分成小块,并将这些块存储在驱动程序的 BlockManager 中。
在每个执行器上,执行器首先尝试从其 BlockManager 中获取对象。如果不存在,则它会使用远程获取从驱动程序和/或其他执行器(如果可用)获取小块。一旦它获取了块,它就会将块放入自己的 BlockManager 中,以供其他执行器从中获取。这可以防止驱动程序成为发送多个广播数据副本(每个执行程序一个)的瓶颈。
读取类的writeBlock函数,我们可以看到该广播的TorrentBroadcast硬编码选项。StorageLevel.MEMORY_AND_DISK_SER
/**
* Divide the object into multiple blocks and put those blocks in the block manager.
*
* @param value the object to divide
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(value: T): Int = {
import StorageLevel._
:
:
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId " + s"in local BlockManager")
}
:
:
Run Code Online (Sandbox Code Playgroud)
因此,由于此代码使用 的硬编码值StorageLevel.MEMORY_AND_DISK_SER,因此我们不能用于StorageLevel.OFF_HEAP广播变量。
| 归档时间: |
|
| 查看次数: |
3078 次 |
| 最近记录: |