Eva*_*mir 9 apache-spark apache-spark-sql pyspark
假设我有一个DataFrame用户列和另一列用于他们写的单词:
Row(user='Bob', word='hello')
Row(user='Bob', word='world')
Row(user='Mary', word='Have')
Row(user='Mary', word='a')
Row(user='Mary', word='nice')
Row(user='Mary', word='day')
Run Code Online (Sandbox Code Playgroud)
我想将word列聚合成一个向量:
Row(user='Bob', words=['hello','world'])
Row(user='Mary', words=['Have','a','nice','day'])
Run Code Online (Sandbox Code Playgroud)
似乎我不能使用任何Sparks分组函数,因为它们期望后续的聚合步骤.我的用例是我想将这些数据提供给Word2Vec不使用其他Spark聚合.
Eva*_*mir 17
感谢@titipat提供RDD解决方案.我没有我的帖子后不久认识到,有是实际使用一个数据帧的解决方案collect_set(或collect_list):
from pyspark.sql import Row
from pyspark.sql.functions import collect_set
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
Row(user='Bob', word='world'),
Row(user='Mary', word='Have'),
Row(user='Mary', word='a'),
Row(user='Mary', word='nice'),
Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)
group_user = df.groupBy('user').agg(collect_set('word').alias('words'))
print(group_user.collect())
>[Row(user='Mary', words=['Have', 'nice', 'day', 'a']), Row(user='Bob', words=['world', 'hello'])]
Run Code Online (Sandbox Code Playgroud)
from pyspark.sql import functions as F
df.groupby("user").agg(F.collect_list("word"))
Run Code Online (Sandbox Code Playgroud)
这是使用的解决方案rdd。
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
Row(user='Bob', word='world'),
Row(user='Mary', word='Have'),
Row(user='Mary', word='a'),
Row(user='Mary', word='nice'),
Row(user='Mary', word='day')])
group_user = rdd.groupBy(lambda x: x.user)
group_agg = group_user.map(lambda x: Row(**{'user': x[0], 'word': [t.word for t in x[1]]}))
Run Code Online (Sandbox Code Playgroud)
来自的输出group_agg.collect():
[Row(user='Bob', word=['hello', 'world']),
Row(user='Mary', word=['Have', 'a', 'nice', 'day'])]
Run Code Online (Sandbox Code Playgroud)
从 Spark 2.3 版本开始,我们现在有了 Pandas UDF(又名矢量化 UDF)。下面的函数将完成 OP 的任务...使用此函数的好处是保证保留顺序。在许多情况下,例如时间序列分析,顺序是至关重要的。
import pandas as pd
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType
spark = SparkSession.builder.appName('test_collect_array_grouped').getOrCreate()
def collect_array_grouped(df, groupbyCols, aggregateCol, outputCol):
"""
Aggregate function: returns a new :class:`DataFrame` such that for a given column, aggregateCol,
in a DataFrame, df, collect into an array the elements for each grouping defined by the groupbyCols list.
The new DataFrame will have, for each row, the grouping columns and an array of the grouped
values from aggregateCol in the outputCol.
:param groupbyCols: list of columns to group by.
Each element should be a column name (string) or an expression (:class:`Column`).
:param aggregateCol: the column name of the column of values to aggregate into an array
for each grouping.
:param outputCol: the column name of the column to output the aggregeted array to.
"""
groupbyCols = [] if groupbyCols is None else groupbyCols
df = df.select(groupbyCols + [aggregateCol])
schema = df.select(groupbyCols).schema
aggSchema = df.select(aggregateCol).schema
arrayField = StructField(name=outputCol, dataType=ArrayType(aggSchema[0].dataType, False))
schema = schema.add(arrayField)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def _get_array(pd_df):
vals = pd_df[groupbyCols].iloc[0].tolist()
vals.append(pd_df[aggregateCol].values)
return pd.DataFrame([vals])
return df.groupby(groupbyCols).apply(_get_array)
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
Row(user='Bob', word='world'),
Row(user='Mary', word='Have'),
Row(user='Mary', word='a'),
Row(user='Mary', word='nice'),
Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)
collect_array_grouped(df, ['user'], 'word', 'users_words').show()
+----+--------------------+
|user| users_words|
+----+--------------------+
|Mary|[Have, a, nice, day]|
| Bob| [hello, world]|
+----+--------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7110 次 |
| 最近记录: |