如何在PySpark中爆炸?

Eva*_*mir 9 apache-spark apache-spark-sql pyspark

假设我有一个DataFrame用户列和另一列用于他们写的单词:

Row(user='Bob', word='hello')
Row(user='Bob', word='world')
Row(user='Mary', word='Have')
Row(user='Mary', word='a')
Row(user='Mary', word='nice')
Row(user='Mary', word='day')
Run Code Online (Sandbox Code Playgroud)

我想将word列聚合成一个向量:

Row(user='Bob', words=['hello','world'])
Row(user='Mary', words=['Have','a','nice','day'])
Run Code Online (Sandbox Code Playgroud)

似乎我不能使用任何Sparks分组函数,因为它们期望后续的聚合步骤.我的用例是我想将这些数据提供给Word2Vec不使用其他Spark聚合.

Eva*_*mir 17

感谢@titipat提供RDD解决方案.我没有我的帖子后不久认识到,有实际使用一个数据帧的解决方案collect_set(或collect_list):

from pyspark.sql import Row
from pyspark.sql.functions import collect_set
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)
group_user = df.groupBy('user').agg(collect_set('word').alias('words'))
print(group_user.collect())

>[Row(user='Mary', words=['Have', 'nice', 'day', 'a']), Row(user='Bob', words=['world', 'hello'])]
Run Code Online (Sandbox Code Playgroud)

  • 很好的解决方案埃文!我也打算发布 pyspark 数据框解决方案,但你已经想好了 :) (2认同)

Pus*_*hkr 9

from pyspark.sql import functions as F

df.groupby("user").agg(F.collect_list("word"))
Run Code Online (Sandbox Code Playgroud)


tit*_*ata 5

这是使用的解决方案rdd

from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
group_user = rdd.groupBy(lambda x: x.user)
group_agg = group_user.map(lambda x: Row(**{'user': x[0], 'word': [t.word for t in x[1]]}))
Run Code Online (Sandbox Code Playgroud)

来自的输出group_agg.collect()

[Row(user='Bob', word=['hello', 'world']),
Row(user='Mary', word=['Have', 'a', 'nice', 'day'])]
Run Code Online (Sandbox Code Playgroud)


dmb*_*ker 2

从 Spark 2.3 版本开始,我们现在有了 Pandas UDF(又名矢量化 UDF)。下面的函数将完成 OP 的任务...使用此函数的好处是保证保留顺序。在许多情况下,例如时间序列分析,顺序是至关重要的。

import pandas as pd
import findspark

findspark.init()
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType

spark = SparkSession.builder.appName('test_collect_array_grouped').getOrCreate()

def collect_array_grouped(df, groupbyCols, aggregateCol, outputCol):
    """
    Aggregate function: returns a new :class:`DataFrame` such that for a given column, aggregateCol,
    in a DataFrame, df, collect into an array the elements for each grouping defined by the groupbyCols list.
    The new DataFrame will have, for each row, the grouping columns and an array of the grouped
    values from aggregateCol in the outputCol.

    :param groupbyCols: list of columns to group by.
            Each element should be a column name (string) or an expression (:class:`Column`).
    :param aggregateCol: the column name of the column of values to aggregate into an array
            for each grouping.
    :param outputCol: the column name of the column to output the aggregeted array to.
    """
    groupbyCols = [] if groupbyCols is None else groupbyCols
    df = df.select(groupbyCols + [aggregateCol])
    schema = df.select(groupbyCols).schema
    aggSchema = df.select(aggregateCol).schema
    arrayField = StructField(name=outputCol, dataType=ArrayType(aggSchema[0].dataType, False))
    schema = schema.add(arrayField)
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def _get_array(pd_df):
        vals = pd_df[groupbyCols].iloc[0].tolist()
        vals.append(pd_df[aggregateCol].values)
        return pd.DataFrame([vals])
    return df.groupby(groupbyCols).apply(_get_array)

rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)

collect_array_grouped(df, ['user'], 'word', 'users_words').show()

+----+--------------------+
|user|         users_words|
+----+--------------------+
|Mary|[Have, a, nice, day]|
| Bob|      [hello, world]|
+----+--------------------+
Run Code Online (Sandbox Code Playgroud)