获取组中的第一个非空值(Spark 1.6)

ksi*_*ndi 3 apache-spark pyspark spark-dataframe apache-spark-1.6

如何从组中获取第一个非空值?我尝试首先使用coalesce F.first(F.coalesce("code"))但我没有得到所需的行为(我似乎得到了第一行).

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])
Run Code Online (Sandbox Code Playgroud)

我试过了:

(df
  .groupby("id")
  .agg(F.first(F.coalesce("code")),
       F.first(F.coalesce("name")))
  .collect())
Run Code Online (Sandbox Code Playgroud)

期望的输出

[Row(id='a', code='code1', name='name2')]
Run Code Online (Sandbox Code Playgroud)

Dan*_*ula 13

对于Spark 1.3 - 1.5,这可以解决问题:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
|  a|      code1|      name2|
+---+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)

编辑

显然,在1.6版本中,它们改变了first聚合函数的处理方式.现在,底层类First应该使用第二个参数构造,聚合函数ignoreNullsExpr尚未使用该first参数(这里可以看到).但是,在Spark 2.0中,它将能够调用agg(F.first(col, True))忽略空值(可在此处查看).

因此,对于Spark 1.6来说,这种方法必须是不同的,而且效率要低一些,这是非常有效的.一个想法如下:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
|  a|        code1|        name2|
+---+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)

也许有更好的选择.如果我找到答案,我会编辑答案.

  • 这里有一个重要的问题 - 没有窗口规范的`first`是不确定的. (2认同)