在序列化的Spark中使用maxmind geoip

Gam*_*ows 1 scala geoip apache-spark

我正在尝试将MaxMind GeoIP API用于scala-spark,该https://github.com/snowplow/scala-maxmind-iplookups找到了。我使用标准加载文件:

val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
Run Code Online (Sandbox Code Playgroud)

我有一个基本的csv文件,该文件中包含时间和IP地址:

val sweek1 = week1.map{line=> IP(parse(line))}.collect{
  case Some(ip) => {
    val ipadress = ipdetect(ip.ip)
    (ip.time, ipadress)
    }
}
Run Code Online (Sandbox Code Playgroud)

ipdetect函数的基本定义是:

def ipdetect(a:String)={
  ipLookups.performLookups(a)._1 match{
    case Some(value) => value.toString
    case _ => "Unknown"
  }
}
Run Code Online (Sandbox Code Playgroud)

当我运行该程序时,它提示“任务不可序列化”。所以我读了几篇文章,似乎有一些解决方法。

1,包装 2,使用SparkContext.addFile(在整个群集中分发文件)

但是我无法弄清楚它们中的任何一个是如何工作的,我尝试了包装器,但是我不知道如何以及在何处调用它。我尝试了addFile,但是它返回的是Unit而不是String,我认为您需要以某种方式通过管道传输Binary文件。因此,我不确定现在该怎么办。任何帮助深表感谢

因此,我已经能够通过使用mapPartitions对其进行某种程度的序列化,并在每个本地分区上进行迭代,但是我想知道是否存在一种更有效的方法,因为我拥有数百万个数据集

Lyn*_*hen 5

假设您的csv文件每行包含一个IP地址,例如,您想将每个IP地址映射到一个城市。

import com.snowplowanalytics.maxmind.iplookups.IpLookups

val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)

def parseIP(ip:String, ipLookups: IpLookups): String = {
  val lookupResult = ipLookups.performLookups(ip)
  val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}

val logs = sc.textFile("path/to/your.csv")
             .mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
Run Code Online (Sandbox Code Playgroud)

有关其他IP转换,请参考Scala MaxMind IP查找。而且,mapWith似乎已被弃用。使用mapPartitionsWithIndex代替。