Spark和BloomFilter共享

jde*_*lop 3 bloom-filter apache-spark

我有一个巨大的RDD(源),我需要从中创建一个BloomFilter数据,因此对用户数据的后续更新将只考虑真正的“差异”,而不重复。

看起来BloomFilter的大多数实现都是不可序列化的(尽管可以很容易地解决),但是我想要稍微不同的工作流程:

  1. 处理每个分区,并为每个分区创建适当的BloomFilter的实例。对于每个BloomFilter对象-将其写入二进制文件。我实际上不知道如何整体处理分区mapPartition-RDD上有可用的函数,但是希望我返回一个Iterator。也许我可以使用传递的迭代器,创建BloomFilter的实例,将其写入某个地方,然后以Iterator.singleton[PathToFile]?形式返回到创建文件的链接?
  2. 在主节点上- consume处理的结果(文件路径列表),读取这些文件并在内存中聚合BloomFilter。然后将响应写入二进制文件。

我不知道正确的方法:

  • 在传递给的函数中,在群集支持的FS中创建文件(可以是HDFS,S3N或本地文件) mapPartitions
  • 在第二阶段使用读取文件的内容consume(当我具有包含文件路径的RDD时,我必须使用它SparkContext来读取它们-不知道怎么可能)。

谢谢!

use*_*411 5

breeze实现不是最快的实现,但是它具有通常的Spark依赖项,可以与simple aggregate以下项一起使用:

import breeze.util.BloomFilter

// Adjust values to fit your case
val numBuckets: Int = 100
val numHashFunctions: Int = 30

val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4)
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
  _ += _, _ |= _
)

bf.contains("a")
Run Code Online (Sandbox Code Playgroud)
Boolean = true
Run Code Online (Sandbox Code Playgroud)
bf.contains("n")
Run Code Online (Sandbox Code Playgroud)
Boolean = false
Run Code Online (Sandbox Code Playgroud)

在Spark 2.0+中,您可以使用DataFrameStatFunctions.bloomFilter

val df = rdd.toDF

val expectedNumItems: Long = 1000 
val fpp: Double = 0.005

val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp)

sbf.mightContain("a")
Run Code Online (Sandbox Code Playgroud)
Boolean = true
Run Code Online (Sandbox Code Playgroud)
sbf.mightContain("n")
Run Code Online (Sandbox Code Playgroud)
Boolean = false
Run Code Online (Sandbox Code Playgroud)

Algebird实现也可以正常工作,并且可以与breeze实现类似地使用。