pat*_*rit 11 algorithm apache-spark spark-streaming apache-spark-sql spark-dataframe
我有一个数据集,包括(sensor_id, timestamp, data)(在sensor_id是的IoT设备的ID,时间戳是UNIX时间和数据是它们的输出在当时的MD5散列).表中没有主键,但每行都是唯一的.
我需要找到所有sensor_ids 对,s1并且s2这两个传感器在它们之间至少具有n(n=50)条目(timestamp, data),即在n不同的情况下它们在相同的时间戳发出相同的数据.
对于数据的大小感,我有10B行和~50M不同sensor_ids,我相信大约有~5M对传感器ID,它们在同一时间戳发出相同数据至少50次.
Spark中最好的方法是什么?我尝试了各种方法(分组(timestamp, data)和/或自连接),但它们的复杂性非常昂贵.
如果我的理解是正确的,那么我可以使用下面的简单代码来实现这一点,
test("Spark: Find pairs having atleast n common attributes"){
/**
* s1,1210283218710,34
s1,1210283218730,24
s1,1210283218750,84
s1,1210283218780,54
s2,1210283218710,34
s2,1210283218730,24
s2,1210283218750,84
s2,1210283218780,54
s3,1210283218730,24
s3,1210283218750,84
s3,1210283218780,54
*/
val duplicateSensors = sc.textFile("sensor_data")
.map(line => line.split(",")).map(ar=>((ar(1),ar(2)),ar(0) )) // (ts,val),sid
.aggregateByKey(List.empty[String])(_ :+_,_:::_)// grouped(ts,val)(List(n sid))
.flatMapValues(l => l.sorted.combinations(2))// (ts,val)(List(2 sid combination))
.map(_._2).countByValue() // List(s1, s3) -> 3, List(s2, s3) -> 3, List(s1, s2) -> 4 (2sensors, no of common entries)
// Now Do the filter .... grater than 50
duplicateSensors.foreach(println)
}
Run Code Online (Sandbox Code Playgroud)
您将获得具有共同属性的对数。
| 归档时间: |
|
| 查看次数: |
332 次 |
| 最近记录: |