jde*_*lop 3 bloom-filter apache-spark
我有一个巨大的RDD(源),我需要从中创建一个BloomFilter数据,因此对用户数据的后续更新将只考虑真正的“差异”,而不重复。
看起来BloomFilter的大多数实现都是不可序列化的(尽管可以很容易地解决),但是我想要稍微不同的工作流程:
mapPartition-RDD上有可用的函数,但是希望我返回一个Iterator。也许我可以使用传递的迭代器,创建BloomFilter的实例,将其写入某个地方,然后以Iterator.singleton[PathToFile]?形式返回到创建文件的链接?consume处理的结果(文件路径列表),读取这些文件并在内存中聚合BloomFilter。然后将响应写入二进制文件。我不知道正确的方法:
mapPartitionsconsume(当我具有包含文件路径的RDD时,我必须使用它SparkContext来读取它们-不知道怎么可能)。谢谢!
breeze实现不是最快的实现,但是它具有通常的Spark依赖项,可以与simple aggregate以下项一起使用:
import breeze.util.BloomFilter
// Adjust values to fit your case
val numBuckets: Int = 100
val numHashFunctions: Int = 30
val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4)
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
_ += _, _ |= _
)
bf.contains("a")
Run Code Online (Sandbox Code Playgroud)
Boolean = true
Run Code Online (Sandbox Code Playgroud)
bf.contains("n")
Run Code Online (Sandbox Code Playgroud)
Boolean = false
Run Code Online (Sandbox Code Playgroud)
在Spark 2.0+中,您可以使用DataFrameStatFunctions.bloomFilter:
val df = rdd.toDF
val expectedNumItems: Long = 1000
val fpp: Double = 0.005
val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp)
sbf.mightContain("a")
Run Code Online (Sandbox Code Playgroud)
Boolean = true
Run Code Online (Sandbox Code Playgroud)
sbf.mightContain("n")
Run Code Online (Sandbox Code Playgroud)
Boolean = false
Run Code Online (Sandbox Code Playgroud)
Algebird实现也可以正常工作,并且可以与breeze实现类似地使用。
| 归档时间: |
|
| 查看次数: |
3054 次 |
| 最近记录: |