groupBy 聚合函数中的 PySpark 循环

Bad*_*lou 1 group-by aggregate pyspark

我有一个大表,我正在尝试计算按位置分组的某些列的总和(有条件)。

我的代码看起来像这样,并且我的列越来越多

df.groupBy(location_column).agg(
        F.sum(F.when(F.col(col1) == True, F.col(value))).alias("SUM " + col1),
        F.sum(F.when(F.col(col2) == True, F.col(value))).alias("SUM " + col2),
        F.sum(F.when(F.col(col3) == True, F.col(value))).alias("SUM " + col3),
        ....
        # Additional lines for additional columns (around 20)
)
Run Code Online (Sandbox Code Playgroud)

我想通过基本上做类似的事情来重构我的代码,使其看起来不那么愚蠢

cols = [col1, col2, col3, ... , coln]
df.groupBy(location_column).agg([F.sum(F.when(F.col(x) == True, F.col(value))).alias("SUM " + x)] for x in cols)
Run Code Online (Sandbox Code Playgroud)

它不起作用,因为 agg() 函数不接受列表:

assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
Run Code Online (Sandbox Code Playgroud)

有重构的解决方案吗?谢谢

mck*_*mck 6

for x in cols应该在方括号内。您还需要在列表理解之前添加一个*来扩展参数:

df.groupBy(location_column).agg(
    *[F.sum(F.when(F.col(x) == True, F.col(value))).alias("SUM " + x) for x in cols]
)
Run Code Online (Sandbox Code Playgroud)