Dru*_*rBg 14 scala apache-spark
Spark专家的一个好问题.
我正在处理map操作(RDD)中的数据.在mapper函数中,我需要查找类的对象A以用于处理RDD中的元素.
由于这将在执行程序上执行,并且类型元素A(将被查找)的创建恰好是一项昂贵的操作,我想在每个执行程序上预加载和缓存这些对象.这样做的最佳方式是什么?
一个想法是广播查找表,但类A不可序列化(无法控制其实现).
另一个想法是将它们加载到单个对象中.但是,我想控制加载到查找表中的内容(例如,可能在不同的Spark作业上有不同的数据).
理想情况下,我想指定一次将在执行程序上加载的内容(包括Streaming的情况,以便查找表在批处理之间保留在内存中),通过驱动程序在启动期间可用的参数,数据得到处理.
是否有干净优雅的方式或无法实现?
这正是broadcast. 广播变量的目标用例,传输一次并使用种子有效地移动到所有执行器,并保留在内存/本地磁盘中,直到您不再需要它们为止.
在使用其他人的界面时,序列化经常会成为一个问题.如果您可以强制执行您使用的对象是可序列化的,那么这将是最佳解决方案.如果这是不可能的,那么你的生活会变得更加复杂.如果无法序列化A对象,则必须在执行程序上为每个任务创建它们.如果它们存储在某个文件中,则看起来像:
rdd.mapPartitions { it =>
val lookupTable = loadLookupTable(path)
it.map(elem => fn(lookupTable, elem))
}
Run Code Online (Sandbox Code Playgroud)
请注意,如果您正在使用此模型,则必须为每个任务加载一次查找表 - 您无法从广播变量的跨任务持久性中受益.
编辑:这是另一个模型,我相信让你可以跨每个JVM的任务共享查找表.
class BroadcastableLookupTable {
@transient val lookupTable: LookupTable[A] = null
def get: LookupTable[A] = {
if (lookupTable == null)
lookupTable = < load lookup table from disk>
lookupTable
}
}
Run Code Online (Sandbox Code Playgroud)
这个类可以广播(没有实质性的传输),并且第一次按JVM调用它,你将加载查找表并返回它.
| 归档时间: |
|
| 查看次数: |
1779 次 |
| 最近记录: |