Pyspark:在数据帧的不同组上应用 kmeans

Eva*_*Eva 13 group-by k-means apache-spark pyspark

使用 Pyspark,我想将 kmeans 分别应用于数据帧的组,而不是一次应用于整个数据帧。目前,我使用了一个 for 循环,它对每个组进行迭代,应用 kmeans 并将结果附加到另一个表中。但是有很多组会很费时间。有人可以帮我吗??非常感谢!

for customer in customer_list:
    temp_df = togroup.filter(col("customer_id")==customer)
    df = assembler.transform(temp_df)
    k = 1
    while (k < 5 & mtrc < width):
        k += 1
        kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
        model = kmeans.fit(df)
        mtric = 1 - model.computeCost(df)/ttvar
        a = model.transform(df)select(cols)
        allcustomers = allcustomers .union(a)
Run Code Online (Sandbox Code Playgroud)

pmj*_*jn6 1

我想出了一个使用 pandas_udf 的解决方案。纯 Spark 或 scala 解决方案是首选,但尚未提供。假设我的数据是

import pandas as pd
df_pd = pd.DataFrame([['cat1',10.],['cat1',20.],['cat1',11.],['cat1',21.],['cat1',22.],['cat1',9.],['cat2',101.],['cat2',201.],['cat2',111.],['cat2',214.],['cat2',224.],['cat2',99.]],columns=['cat','val'])
df_sprk = spark.createDataFrame(df_pd)
Run Code Online (Sandbox Code Playgroud)

首先解决pandas中的问题:

from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=2,random_state=0)

def skmean(kmeans,x):
    X = np.array(x)
    kmeans.fit(X)
    return(kmeans.predict(X))
Run Code Online (Sandbox Code Playgroud)

您可以将 skmean() 应用于 panda 数据框(以确保其正常工作):

df_pd.groupby('cat').apply(lambda x:skmean(kmeans,x)).reset_index()
Run Code Online (Sandbox Code Playgroud)

为了将该函数应用于 pyspark 数据框,我们使用 pandas_udf。但首先为输出数据帧定义一个模式:

from pyspark.sql.types import *
schema = StructType(
       [StructField('cat',StringType(),True),
        StructField('clusters',ArrayType(IntegerType()))])
Run Code Online (Sandbox Code Playgroud)

将上面的函数转换为 pandas_udf:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType  

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def skmean_udf(df):
    result = pd.DataFrame(
             df.groupby('cat').apply(lambda x: skmean(kmeans,x))
    result.reset_index(inplace=True, drop=False)
    return(result)
Run Code Online (Sandbox Code Playgroud)

您可以按如下方式使用该功能:

df_spark.groupby('cat').apply(skmean_udf).show()
Run Code Online (Sandbox Code Playgroud)