如何为Spark RDD中的元素分配唯一的连续数字

Dil*_*nga 46 apache-spark apache-spark-mllib

我有一个数据集(user, product, review),并希望将其提供给mllib的ALS算法.

该算法需要用户和产品为数字,而我的是String用户名和字符串SKU.

现在,我获得了不同的用户和SKU,然后在Spark之外为他们分配数字ID.

我想知道是否有更好的方法来做到这一点.我想到的一种方法是编写一个自定义RDD,基本上枚举1到n,然后在两个RDD上调用zip.

Dan*_*bos 41

Spark 1.0开始,您可以使用两种方法轻松解决此问题:

  • RDD.zipWithIndex就像Seq.zipWithIndex,它添加了连续的(Long)数字.这需要先计算每个分区中的元素,因此您的输入将被评估两次.如果要使用它,请缓存输入RDD.
  • RDD.zipWithUniqueId还为您提供了唯一的LongID,但不保证它们是连续的.(如果每个分区具有相同数量的元素,它们将只是连续的.)好处是,这不需要知道有关输入的任何信息,因此不会导致双重评估.

  • 谢谢。所以 RDD.zipWithUniqueId 不会扫描数据集两次? (2认同)
  • 正确.请参阅https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1298. (2认同)

Sea*_*wen 15

对于类似的示例用例,我只是对字符串值进行了哈希处理.见http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))
Run Code Online (Sandbox Code Playgroud)

虽然哈希可以更容易管理,但听起来你已经在做这样的事了.

Matei在这里提出了一种模拟zipWithIndexRDD的方法,相当于在每个区域内分配全局唯一的ID:https://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E

  • 这是个好主意,但必须对[碰撞次数]保持谨慎(https://en.wikipedia.org/wiki/Birthday_problem).对于正被编码的集合的arity(例如,标签,用户名等)接近100k的应用,冲突的数量可能很大. (2认同)

rad*_*1st 8

另一个简单的选择,如果使用DataFrames并且只关注唯一性,则使用MonotonicallyIncreasingID函数

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)
Run Code Online (Sandbox Code Playgroud)

编辑:自Spark 2.0以来MonotonicallyIncreasingID已被弃用和删除; 它现在被称为.monotonically_increasing_id

  • 这种方法实际上不适用于 ALS 中的用户/项目标识符,因为 monotonically_increasing_id() 产生 64 位数字(即 long 不是 int),而“基于 DataFrame 的 ALS API 目前仅支持用户和项目 ID”(来自 https://spark.apache.org/docs/2.0.0/ml-collaborative-filtering.html) (2认同)