如何在pyspark中动态聚合列

neo*_*bot 0 aggregate-functions pyspark

pct_<original_name>_valid我想计算每个输入列的非缺失值的百分比。在此示例中只有 2 列,因此可以轻松手动编写下面的代码。但是当有 30 多个列时,我不想手动执行此操作。甚至可以动态地执行此操作吗?(例如,将列名称列表作为输入)

import pyspark.sql.functions as F

d = [{'name': 'Alice', 'age': 1}, {'name': 'Bae', 'age': None}]
df = spark.createDataFrame(d)

df.withColumn('name_valid', F.when(col("name").isNotNull(),1).otherwise(0))\
.withColumn('age_valid', F.when(col("age").isNotNull(),1).otherwise(0))\
.agg(
    (100.0*F.sum(col("name_valid"))/F.count(F.lit(1))).alias("pct_name_valid"),
    (100.0*F.sum(col("age_valid"))/F.count(F.lit(1))).alias("pct_age_valid")
)\
.show()
Run Code Online (Sandbox Code Playgroud)

结果如下:

+--------------+-------------+
|pct_name_valid|pct_age_valid|
+--------------+-------------+
|         100.0|         50.0|
+--------------+-------------+
Run Code Online (Sandbox Code Playgroud)

如前所述,我不想对所有 30 多个列手动执行此操作。有什么办法我可以这样做:

my_output = calculate_non_missing_percentage(df, my_columns = ["name", "age", "gender", "school", "color"])
Run Code Online (Sandbox Code Playgroud)

Ste*_*ven 5

您可以使用列的名称动态聚合列。

cols = df.columns

# transform null values in 0, else 1
df = df.select(
    *(
        F.when(
            F.col(col).isNull(),
            0
        ).otherwise(1).alias(col)
        for col
        in cols
    )
)

# percentage of non-missing value
df = df.agg(
    *(
        (F.sum(col)/F.count(col)).alias('{}_ratio'.format(col))
        for col
        in cols
    )
)

df.show()                                                                                                       
+---------+----------+
|age_ratio|name_ratio|
+---------+----------+
|      0.5|       1.0|
+---------+----------+
Run Code Online (Sandbox Code Playgroud)