大量列的性能下降.Pyspark

Ant*_*eev 10 python machine-learning pandas apache-spark pyspark

我遇到了处理火花宽数据帧(大约9000列,有时甚至更多)的问题.
任务:

  1. 通过groupBy和pivot创建宽DF.
  2. 将列转换为vector并从pyspark.ml处理到KMeans.

所以我制作了大量的框架并尝试使用VectorAssembler创建矢量,缓存它并在其上训练KMeans.
在独立模式下,对于帧~500x9000,我的电脑上的7个不同数量的集群需要大约11分钟的组装时间和2分钟的KMeans.另一方面,pandas(pivot df和iterate 7 clusters)中的这种处理花费的时间少于一分钟.
显然我理解独立模式和缓存等的开销和性能下降但是它真的让我气馁.
有人可以解释我如何避免这种开销吗?
人们如何使用宽DF而不是使用vectorassembler并降低性能?
更正式的问题(对于sof规则)听起来像 - 我怎样才能加快这段代码的速度?

%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
       .groupBy('User')
       .pivot('ObjectPath')
       .agg({'PropertyFlagValue':'max'})
       .fillna(0))
ignore = ['User']
assembler = VectorAssembler(
    inputCols=[x for x in tmp.columns if x not in ignore],
    outputCol='features')
Wall time: 36.7 s

print(tmp.count(), len(tmp.columns))
552, 9378

%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s

%%time
lst_levels = []
for num in range(3, 14):
    kmeans = KMeans(k=num, maxIter=50)
    model = kmeans.fit(transformed)
    lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
    if i - j < j:
        print(rs.index(i))
        kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
        model = kmeans.fit(transformed)
        break
 Wall time: 1min 32s
Run Code Online (Sandbox Code Playgroud)

配置:

.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \
Run Code Online (Sandbox Code Playgroud)

Ant*_*eev 2

实际上在rdd的map中找到了解决方案。

  1. 首先,我们要创建价值地图。
  2. 还提取所有不同的名称。
  3. 倒数第二步,我们在名称字典中搜索行映射的每个值,并返回值,如果没有找到,则返回 0。
  4. 结果向量汇编器。

优点:

  1. 您不必创建具有大量列数的宽数据框,从而避免开销。(速度从 11 分钟提高到 1 分钟。)
  2. 您仍然在集群上工作并以 Spark 范例执行代码。

代码示例:scala 实现