重命名pyspark数据帧聚合的列

can*_*his 56 dataframe pyspark

我正在使用pyspark数据帧分析一些数据,假设我有一个df我正在聚合的数据帧:

df.groupBy("group")\
  .agg({"money":"sum"})\
  .show(100)
Run Code Online (Sandbox Code Playgroud)

这会给我:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646
Run Code Online (Sandbox Code Playgroud)

聚合工作正常,但我不喜欢新的列名"SUM(钱#2L)".有没有一种巧妙的方法可以将此列重命名为人类可读的.agg方法?也许更类似于人们会做的事情dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money))
Run Code Online (Sandbox Code Playgroud)

can*_*his 97

虽然我仍然喜欢dplyr语法,但这段代码片段可以:

import pyspark.sql.functions as sf

df.groupBy("group")\
  .agg(sf.sum('money').alias('money'))\
  .show(100)
Run Code Online (Sandbox Code Playgroud)

它变得冗长.

  • 对于复制粘贴了此“别名”部分但看不到影响的其他任何人,请注意括号。“ alias('string')”存在于“ agg”内部,否则,您将为整个DataFrame别名,而不仅仅是该列。 (4认同)

dnl*_*rky 49

withColumnRenamed应该做的伎俩.这是pyspark.sql API的链接.

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)
Run Code Online (Sandbox Code Playgroud)

  • “alias”是一个很好的指针,但这是正确的答案 - 有时有充分的理由在“agg”中使用字典,并且似乎“别名”聚合列的唯一方法是重命名它。 (3认同)

小智 7

这很简单:

 val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
Run Code Online (Sandbox Code Playgroud)

.as在 agg 中使用来命名创建的新行。

  • 从 PySpark 2.4.0 开始,`.as('new_name')` 应该是 `.alias('new_name')`。 (5认同)

Aar*_*les 6

我为此做了一个小助手功能,可能会帮助一些人。

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df
Run Code Online (Sandbox Code Playgroud)

一个例子:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

Run Code Online (Sandbox Code Playgroud)

至少做一点让人们免于打字。

  • 非常有用和及时。我刚要问同样的问题。如果您可以在 `agg` dict 中指定一个新列名(我的意思是在 Spark 中),那就太好了。 (2认同)

Nea*_*eal 6

.alias.withColumnRenamed如果您愿意对列名称进行硬编码,则两者都可以工作。如果您需要一个编程解决方案,例如用于聚合所有剩余列的更友好的名称,这提供了一个很好的起点:

grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
    df
    .groupBy(grouping_column)
    .agg(
        *cols
    )
)
Run Code Online (Sandbox Code Playgroud)