can*_*his 56 dataframe pyspark
我正在使用pyspark数据帧分析一些数据,假设我有一个df我正在聚合的数据帧:
df.groupBy("group")\
.agg({"money":"sum"})\
.show(100)
Run Code Online (Sandbox Code Playgroud)
这会给我:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
Run Code Online (Sandbox Code Playgroud)
聚合工作正常,但我不喜欢新的列名"SUM(钱#2L)".有没有一种巧妙的方法可以将此列重命名为人类可读的.agg方法?也许更类似于人们会做的事情dplyr:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
Run Code Online (Sandbox Code Playgroud)
can*_*his 97
虽然我仍然喜欢dplyr语法,但这段代码片段可以:
import pyspark.sql.functions as sf
df.groupBy("group")\
.agg(sf.sum('money').alias('money'))\
.show(100)
Run Code Online (Sandbox Code Playgroud)
它变得冗长.
dnl*_*rky 49
withColumnRenamed应该做的伎俩.这是pyspark.sql API的链接.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
Run Code Online (Sandbox Code Playgroud)
小智 7
这很简单:
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
Run Code Online (Sandbox Code Playgroud)
.as在 agg 中使用来命名创建的新行。
我为此做了一个小助手功能,可能会帮助一些人。
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
Run Code Online (Sandbox Code Playgroud)
一个例子:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
Run Code Online (Sandbox Code Playgroud)
至少做一点让人们免于打字。
.alias.withColumnRenamed如果您愿意对列名称进行硬编码,则两者都可以工作。如果您需要一个编程解决方案,例如用于聚合所有剩余列的更友好的名称,这提供了一个很好的起点:
grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
df
.groupBy(grouping_column)
.agg(
*cols
)
)
Run Code Online (Sandbox Code Playgroud)