在大型记录上,Spark StringIndexer.fit非常慢

Ren*_*jam 9 apache-spark apache-spark-ml apache-spark-dataset

我有格式化为以下示例的大数据记录:

// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc|   123|  true|
// |abc|   345|  true|
// |abc|   567|  true|
// |def|   123|  true|
// |def|   345|  true|
// |def|   567|  true|
// |def|   789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)

cid并且itemId是字符串。

有965,964,223条记录。

我正在尝试cid使用StringIndexer以下方法将其转换为整数:

dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)

但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。

我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。

我可以进一步改善性能吗?任何帮助都感激不尽。

104*_*ica 5

如果列的基数很高,那么这是预期的行为。作为训练过程的一部分,StringIndexer收集所有标签,并创建标签-索引映射(使用 Spark o.a.s.util.collection.OpenHashMap)。

在最坏的情况下,此过程需要 O(N) 内存,并且是计算密集型和内存密集型的。

如果列的基数很高,并且其内容将用作特征,则最好应用FeatureHasher(Spark 2.3 或更高版本)。

import org.apache.spark.ml.feature.FeatureHasher

val hasher = new FeatureHasher()
  .setInputCols("cid")
  .setOutputCols("cid_hash_vec")
hasher.transform(dataset)
Run Code Online (Sandbox Code Playgroud)

它不保证唯一性并且不可逆,但它对于许多应用来说已经足够好了,并且不需要拟合过程。

对于不用作功能的列,您还可以使用hash函数:

import org.apache.spark.sql.functions.hash

dataset.withColumn("cid_hash", hash($"cid"))
Run Code Online (Sandbox Code Playgroud)