使用Apache-Spark,根据条件减少或折叠RDD

Ale*_*ese 1 reduce fold apache-spark rdd

我正在使用Apache Spark和Scala.我有一个RDD的String,Int

val counts =words.map(word => (word, 1)).reduceByKey((a,b) => (a + b))     
Run Code Online (Sandbox Code Playgroud)

现在我通过Key减少了RDD,但是我想添加另一个功能来减少相似的单词.

我虽然使用Levenshtein距离,欧几里德距离或余弦距离.

那么,我如何应用其中一个函数来减少我的RDD?

例:

RDD ->  (forks,12), (fork,4), (chair,15) , (table,1), (tables,11)
Run Code Online (Sandbox Code Playgroud)

承认相似度算法有效,如何获得减少的RDD,如:

RDD -> (fork,16), (table,12), (chair,15)
Run Code Online (Sandbox Code Playgroud)

我尝试过类似的东西:

counts.foldLeft(){(x,y) => 
  if(x._1.euclideanDistance(y._1) > 0.9) 
    (x,x._2+y._2) 
}
Run Code Online (Sandbox Code Playgroud)

Dan*_*bos 6

你在尝试什么是行不通的.

如果您只有一个distance(a, b)功能,那么解决问题的效率和复杂度都非常低.您需要使用RDD.cartesian生成所有可能的(word1, word2)对.然后过滤掉距离太远的那些.现在你有了相似的单词对.比方说,他们(fox, fix),(fix, six)和他们的逆转.然后,您要总结为计数fox,fixsix.为此,您需要在由相似单词对定义的图形中找到连接的组件.获得每个单词的组件ID后,您可以按组件ID对计数求和.

我认为解决方案是编写一个可以将单词转换为"规范"形式的函数.它会变成forks,forkingforkedfork.然后你可以reduceByKey再次申请.

没有Spark,这将是最快的步骤.一旦counts使用Spark 计算,就会有一个很小的数据集 - 每个不同的单词都有一个整数.这是最简单的collect,然后mapgroupBy counts本地.