jbr*_*own 6 scala apache-spark
我想在spark中执行我的数据的geoip查找.要做到这一点,我正在使用MaxMind的geoIP数据库.
我想要做的是在每个分区上初始化一次geoip数据库对象,然后使用它来查找与IP地址相关的城市.
spark是否有每个节点的初始化阶段,或者我应该检查实例变量是否未定义,如果是,请在继续之前初始化它?例如(这是python,但我想要一个scala解决方案):
class IPLookup(object):
database = None
def getCity(self, ip):
if not database:
self.database = self.initialise(geoipPath)
...
Run Code Online (Sandbox Code Playgroud)
当然,这样做需要spark会将整个对象序列化,这是文档提醒的.
在Spark中,每个分区操作可以使用:
def mapPartitions[U](f: (Iterator[T]) ? Iterator[U], preservesPartitioning: Boolean = false)
Run Code Online (Sandbox Code Playgroud)
此映射器将在f
元素迭代器上为每个分区执行一次该函数.我们的想法是,在迭代器中的许多元素上使用这些资源会抵消设置资源(如数据库连接)的成本.
例:
val logsRDD = ???
logsRDD.mapPartitions{iter =>
val geoIp = new GeoIPLookupDB(...)
// this is local map over the iterator - do not confuse with rdd.map
iter.map(elem => (geoIp.resolve(elem.ip),elem))
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2710 次 |
最近记录: |