使用布隆过滤器减少

mit*_*hus 3 scala bloom-filter apache-spark

我希望得到一个快速近似集合成员资格,基于应用于字符串向量的大型Spark RDD(~1B记录)的字符串值函数.基本上,这个想法是减少到Bloom过滤器.然后可以将该布隆过滤器广播给工人以供进一步使用.

更具体地说,我目前有

rdd: RDD[Vector[String]]
f: Vector[String] => String
val uniqueVals = rdd.map(f).distinct().collect()
val uv = sc.broadcast(uniqueVals)
Run Code Online (Sandbox Code Playgroud)

但是uniqueVals太大而不实用,我想用更小(和已知)尺寸的东西替换它,即布隆过滤器.

我的问题:

  • 是否可以减少到Bloom过滤器,或者我必须先收集,然后在驱动程序中构建它?

  • 是否有适合这种情况的成熟的Scala/Java Bloom过滤器实现?

Gre*_*ret 10

是的,Bloom过滤器可以减少,因为它们有一些很好的属性(它们是幺半群).这意味着您可以并行执行所有聚合操作,只对数据进行一次有效传递,为每个分区构造BloomFilter,然后将这些BloomFilter一起减少,以获得可查询的单个BloomFilter contains.

Scala中至少有两个BloomFilter实现,它们似乎都是成熟的项目(实际上并没有在生产中使用它们).第一个是Breeze,第二个是Twitter的Algebird.两者都包含不同草图的实现以及更多.

这是一个如何使用Breeze执行此操作的示例:

import breeze.util.BloomFilter

val nums = List(1 to 20: _*).map(_.toString)
val rdd = sc.parallelize(nums, 5)

val bf = rdd.mapPartitions { iter =>
  val bf = BloomFilter.optimallySized[String](10000, 0.001)
  iter.foreach(i => bf += i)
  Iterator(bf)
}.reduce(_ | _)

println(bf.contains("5")) // true
println(bf.contains("31")) // false
Run Code Online (Sandbox Code Playgroud)

  • 此解决方案的一个问题是:在合并它们之前,它会将所有分区的所有布隆过滤器发送给驱动程序,这很容易导致驱动程序内存不足.`treeReduce(_ | _,depth = DEPTH)`通过以树状方式减少来帮助解决这个问题. (2认同)