Ren*_*jam 9 apache-spark apache-spark-ml apache-spark-dataset
我有格式化为以下示例的大数据记录:
// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc| 123| true|
// |abc| 345| true|
// |abc| 567| true|
// |def| 123| true|
// |def| 345| true|
// |def| 567| true|
// |def| 789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)
cid并且itemId是字符串。
有965,964,223条记录。
我正在尝试cid使用StringIndexer以下方法将其转换为整数:
dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)
但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。
我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。
我可以进一步改善性能吗?任何帮助都感激不尽。
如果列的基数很高,那么这是预期的行为。作为训练过程的一部分,StringIndexer收集所有标签,并创建标签-索引映射(使用 Spark o.a.s.util.collection.OpenHashMap)。
在最坏的情况下,此过程需要 O(N) 内存,并且是计算密集型和内存密集型的。
如果列的基数很高,并且其内容将用作特征,则最好应用FeatureHasher(Spark 2.3 或更高版本)。
import org.apache.spark.ml.feature.FeatureHasher
val hasher = new FeatureHasher()
.setInputCols("cid")
.setOutputCols("cid_hash_vec")
hasher.transform(dataset)
Run Code Online (Sandbox Code Playgroud)
它不保证唯一性并且不可逆,但它对于许多应用来说已经足够好了,并且不需要拟合过程。
对于不用作功能的列,您还可以使用hash函数:
import org.apache.spark.sql.functions.hash
dataset.withColumn("cid_hash", hash($"cid"))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
412 次 |
| 最近记录: |