Spark - 将字符串ID转换为唯一的整数ID

PBJ*_*PBJ 5 apache-spark

我有一个看起来像这样的数据集,其中每个用户和产品ID都是一个字符串:

userA, productX
userA, productX
userB, productY
Run Code Online (Sandbox Code Playgroud)

拥有约280万件产品和3亿用户; 大约21亿用户产品协会.

我的最终目标是在此数据集上运行Spark协作过滤(ALS).由于它需要用户和产品的int键,我的第一步是为每个用户和产品分配一个唯一的int,并转换上面的数据集,以便用int和int表示用户和产品.

这是我到目前为止所尝试的:

val rawInputData = sc.textFile(params.inputPath)
  .filter { line => !(line contains "\\N") }
  .map { line =>
      val parts = line.split("\t")
      (parts(0), parts(1))  // user, product
    }

// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()

// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()

idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)

// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
  case (id1, (id2s, idx1s)) =>
    val idx1 = idx1s.head
    id2s.map { (_, idx1)
    }
}

// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
  case (id2, (idx1s, idx2s)) =>
    val idx2 = idx2s.head
    idx1s.map{ (_, idx2)
    }
}

// save output
val convertedInts = converted.map{
  case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)
Run Code Online (Sandbox Code Playgroud)

当我尝试在我的集群上运行它时(40个执行器,每个5 GB RAM),它能够很好地生成idx1map和idx2map文件,但是在cogroup之后第一个flatMap的内存错误和获取失败时失败.我之前没有做过很多Spark,所以我想知道是否有更好的方法来实现这一点; 我不太清楚这项工作中的哪些步骤会很昂贵.当然,cogroup需要在整个网络中对整个数据集进行混洗; 但这样的事情意味着什么?

FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)
Run Code Online (Sandbox Code Playgroud)

我不仅仅使用散列函数的原因是我最终希望在更大的数据集(大约10亿个产品,10亿用户,350亿个关联)以及Int键冲突的数量上运行它.会变得很大.是否在该规模的数据集上运行ALS甚至接近可行?

Tob*_*ber 2

我看起来你基本上是在收集所有用户列表,只是为了再次将它们分开。尝试只使用 join 而不是 cogroup,在我看来,这更像你想要的。例如:

import org.apache.spark.SparkContext._
// Create some fake data
val data = sc.parallelize(Seq(("userA", "productA"),("userA", "productB"),("userB", "productB")))
val userId = sc.parallelize(Seq(("userA",1),("userB",2)))
val productId = sc.parallelize(Seq(("productA",1),("productB",2)))

// Replace userName with ID's
val userReplaced = data.join(userId).map{case (_,(prod,user)) => (prod,user)}
// Replace product names with ID's
val bothReplaced = userReplaced.join(productId).map{case (_,(user,prod)) => (user,prod)}

// Check results:
bothReplaced.collect()) // Array((1,1), (1,2), (2,2))
Run Code Online (Sandbox Code Playgroud)

请对其性能发表评论。

(我不知道什么FetchFailed(...)意思)