如何计算pyspark中每行的某些列的最大值

Jua*_*vid 5 python pyspark

我在pyspark中使用sqlContext.sql函数读取数据帧.这包含4个数字列,每个客户端包含信息(这是密钥ID).我需要计算每个客户端的最大值并将此值连接到数据框:

+--------+-------+-------+-------+-------+
|ClientId|m_ant21|m_ant22|m_ant23|m_ant24|
+--------+-------+-------+-------+-------+
|       0|   null|   null|   null|   null|
|       1|   null|   null|   null|   null|
|       2|   null|   null|   null|   null|
|       3|   null|   null|   null|   null|
|       4|   null|   null|   null|   null|
|       5|   null|   null|   null|   null|
|       6|     23|     13|     17|      8|
|       7|   null|   null|   null|   null|
|       8|   null|   null|   null|   null|
|       9|   null|   null|   null|   null|
|      10|     34|      2|      4|      0|
|      11|      0|      0|      0|      0|
|      12|      0|      0|      0|      0|
|      13|      0|      0|     30|      0|
|      14|   null|   null|   null|   null|
|      15|   null|   null|   null|   null|
|      16|     37|     29|     29|     29|
|      17|      0|      0|     16|      0|
|      18|      0|      0|      0|      0|
|      19|   null|   null|   null|   null|
+--------+-------+-------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

在这种情况下,客户端"6"的最大值为23,客户端"10"为30."新"列在新列中自然为空.

请帮我展示我该怎么做这个操作.

She*_*xed 8

有一个功能:pyspark.sql.functions.greatest

>>> df = spark.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect()
[Row(greatest=4)]
Run Code Online (Sandbox Code Playgroud)

该示例直接来自docs

最少的是相反的。)


Kon*_*ewa 4

我认为将值组合到列表中然后在其上查找最大值将是最简单的方法。

from pyspark.sql.types import *

schema = StructType([
    StructField("ClientId", IntegerType(), True),
    StructField("m_ant21", IntegerType(), True),
    StructField("m_ant22", IntegerType(), True),
    StructField("m_ant23", IntegerType(), True),
    StructField("m_ant24", IntegerType(), True)
])

df = spark\
    .createDataFrame(
        data=[(0, None, None, None, None),
             (1, 23, 13, 17, 99),
             (2, 0, 0, 0, 1),
             (3, 0, None, 1, 0)],
        schema=schema)

import pyspark.sql.functions as F

def agg_to_list(m21,m22,m23,m24):
    return [m21,m22,m23,m24]

u_agg_to_list = F.udf(agg_to_list, ArrayType(IntegerType()))

df2 = df.withColumn('all_values', u_agg_to_list('m_ant21', 'm_ant22', 'm_ant23', 'm_ant24'))\
        .withColumn('max', F.sort_array("all_values", False)[0])\
        .select('ClientId', 'max')

df2.show()
Run Code Online (Sandbox Code Playgroud)

输出:

+--------+----+
|ClientId|max |
+--------+----+
|0       |null|
|1       |99  |
|2       |1   |
|3       |1   |
+--------+----+
Run Code Online (Sandbox Code Playgroud)