提高 Spark ML ALS 的计算性能

Nik*_*Nik 5 performance amazon-emr apache-spark apache-spark-sql

我有一个 Spark 作业,它对隐式反馈评级矩阵执行交替最小二乘法 (ALS) 。我按如下方式创建 ALS 对象。

val als = new ALS()
  .setCheckpointInterval(5)
  .setRank(150)
  .setAlpha(30.0)
  .setMaxIter(25)
  .setRegParam(0.001)
  .setUserCol("userId")
  .setItemCol("itemId")
  .setRatingCol("rating")
  .setImplicitPrefs(true)
  .setIntermediateStorageLevel("MEMORY_ONLY")
  .setFinalStorageLevel("MEMORY_ONLY")
Run Code Online (Sandbox Code Playgroud)

创建评级矩阵并用于拟合 ALS 模型,如下所示。

val ratingsSchema = StructType(Array(
  StructField("userId", IntegerType, nullable = true), 
  StructField("itemId", IntegerType, nullable = true),
  StructField("rating", DoubleType, nullable = true)))

val ratings = spark
  .read
  .format("parquet")
  .schema(ratingsSchema)
  .load("/ratings")
  .cache()

val model = als.fit(ratings)
Run Code Online (Sandbox Code Playgroud)

DataFrame 中有大约 1.5 亿唯一用户和 100 万个项目ratings,其中约有 8.5 亿行。

根据上面的数字,满载时评级 DataFrame 应占用约 20 GB 的内存空间。userFactors DataFrame 大小为 150 MM x 150 doubles = 180 GB(大约)。itemFactors DataFrame 应仅为 1.2GB。

这项工作需要很长时间才能完成(15 个小时以上)。我的集群规格如下。

Provider: AWS EMR version 5.14.0
Spark version: 2.3.0
Cluster:
  1 MASTER node: m4.xlarge (8 cores, 16GB mem, 32GB storage)
  2 CORE nodes: i3.xlarge (4 cores, 30GB mem, 950 GB storage)
  20 TASK nodes: r4.4xlarge (16 cores, 122GB mem, 32 GB storage)
  Total TASK cores = 320
  Total TASK memory: 2440 GB
Run Code Online (Sandbox Code Playgroud)

根据上面的数字,所有 DF 都应该可以轻松装入内存(如果需要,还可以使用 TB+ HDFS)。

作业配置:

--executor-memory 102g 
--num-executors 20 
--executor-cores 15
Run Code Online (Sandbox Code Playgroud)

我可以看到有 20 个执行程序正在运行(还有一个驱动程序)。我尝试过缓存评级 DF,也尝试过不缓存。

如果可能的话,如何调整系统使其运行得更快?

有人对ALS 工作有深入了解吗?它会做很多洗牌吗?我们怎样才能最大限度地减少洗牌?

评级矩阵采用parquet存储在 S3 存储桶中的 200 个文件的格式。

如果我有很多小型实例(例如 50 个),还是应该获得一些(例如 5 个)非常大的实例,例如 r4.16xlarge(64 核,488GB 内存),效果会更好吗?