Spark执行器上的对象缓存

Dru*_*rBg 14 scala apache-spark

Spark专家的一个好问题.

我正在处理map操作(RDD)中的数据.在mapper函数中,我需要查找类的对象A以用于处理RDD中的元素.

由于这将在执行程序上执行,并且类型元素A(将被查找)的创建恰好是一项昂贵的操作,我想在每个执行程序上预加载和缓存这些对象.这样做的最佳方式是什么?

  • 一个想法是广播查找表,但类A不可序列化(无法控制其实现).

  • 另一个想法是将它们加载到单个对象中.但是,我想控制加载到查找表中的内容(例如,可能在不同的Spark作业上有不同的数据).

理想情况下,我想指定一次将在执行程序上加载的内容(包括Streaming的情况,以便查找表在批处理之间保留在内存中),通过驱动程序在启动期间可用的参数,数据得到处理.

是否有干净优雅的方式或无法实现?

Tim*_*Tim 5

这正是broadcast. 广播变量的目标用例,传输一次并使用种子有效地移动到所有执行器,并保留在内存/本地磁盘中,直到您不再需要它们为止.

在使用其他人的界面时,序列化经常会成为一个问题.如果您可以强制执行您使用的对象是可序列化的,那么这将是最佳解决方案.如果这是不可能的,那么你的生活会变得更加复杂.如果无法序列​​化A对象,则必须在执行程序上为每个任务创建它们.如果它们存储在某个文件中,则看起来像:

rdd.mapPartitions { it => 
  val lookupTable = loadLookupTable(path)
  it.map(elem => fn(lookupTable, elem))
}
Run Code Online (Sandbox Code Playgroud)

请注意,如果您正在使用此模型,则必须为每个任务加载一次查找表 - 您无法从广播变量的跨任务持久性中受益.

编辑:这是另一个模型,我相信让你可以跨每个JVM的任务共享查找表.

class BroadcastableLookupTable {
  @transient val lookupTable: LookupTable[A] = null

  def get: LookupTable[A] = {
    if (lookupTable == null)
      lookupTable = < load lookup table from disk>
    lookupTable
  }
}
Run Code Online (Sandbox Code Playgroud)

这个类可以广播(没有实质性的传输),并且第一次按JVM调用它,你将加载查找表并返回它.