Gam*_*ows 1 scala geoip apache-spark
我正在尝试将MaxMind GeoIP API用于scala-spark,该https://github.com/snowplow/scala-maxmind-iplookups找到了。我使用标准加载文件:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
Run Code Online (Sandbox Code Playgroud)
我有一个基本的csv文件,该文件中包含时间和IP地址:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
Run Code Online (Sandbox Code Playgroud)
ipdetect函数的基本定义是:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行该程序时,它提示“任务不可序列化”。所以我读了几篇文章,似乎有一些解决方法。
1,包装 2,使用SparkContext.addFile(在整个群集中分发文件)
但是我无法弄清楚它们中的任何一个是如何工作的,我尝试了包装器,但是我不知道如何以及在何处调用它。我尝试了addFile,但是它返回的是Unit而不是String,我认为您需要以某种方式通过管道传输Binary文件。因此,我不确定现在该怎么办。任何帮助深表感谢
因此,我已经能够通过使用mapPartitions对其进行某种程度的序列化,并在每个本地分区上进行迭代,但是我想知道是否存在一种更有效的方法,因为我拥有数百万个数据集
假设您的csv文件每行包含一个IP地址,例如,您想将每个IP地址映射到一个城市。
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
Run Code Online (Sandbox Code Playgroud)
有关其他IP转换,请参考Scala MaxMind IP查找。而且,mapWith似乎已被弃用。使用mapPartitionsWithIndex代替。