如何在spark中执行初始化?

jbr*_*own 6 scala apache-spark

我想在spark中执行我的数据的geoip查找.要做到这一点,我正在使用MaxMind的geoIP数据库.

我想要做的是在每个分区上初始化一次geoip数据库对象,然后使用它来查找与IP地址相关的城市.

spark是否有每个节点的初始化阶段,或者我应该检查实例变量是否未定义,如果是,请在继续之前初始化它?例如(这是python,但我想要一个scala解决方案):

class IPLookup(object):
    database = None

    def getCity(self, ip):
      if not database:
        self.database = self.initialise(geoipPath)
  ...
Run Code Online (Sandbox Code Playgroud)

当然,这样做需要spark会将整个对象序列化,这是文档提醒的.

maa*_*asg 6

在Spark中,每个分区操作可以使用:

def mapPartitions[U](f: (Iterator[T]) ? Iterator[U], preservesPartitioning: Boolean = false)
Run Code Online (Sandbox Code Playgroud)

此映射器将在f元素迭代器上为每个分区执行一次该函数.我们的想法是,在迭代器中的许多元素上使用这些资源会抵消设置资源(如数据库连接)的成本.

例:

val logsRDD = ???
logsRDD.mapPartitions{iter =>
   val geoIp = new GeoIPLookupDB(...)
   // this is local map over the iterator - do not confuse with rdd.map
   iter.map(elem => (geoIp.resolve(elem.ip),elem)) 
}
Run Code Online (Sandbox Code Playgroud)


bea*_*ito 2

这似乎是广播变量的一个很好的用法。您是否查看过该功能的文档?如果查看过,它是否在某种程度上无法满足您的要求?