ksi*_*ndi 3 apache-spark pyspark spark-dataframe apache-spark-1.6
如何从组中获取第一个非空值?我尝试首先使用coalesce F.first(F.coalesce("code"))但我没有得到所需的行为(我似乎得到了第一行).
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
sc = SparkContext("local")
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])
Run Code Online (Sandbox Code Playgroud)
我试过了:
(df
.groupby("id")
.agg(F.first(F.coalesce("code")),
F.first(F.coalesce("name")))
.collect())
Run Code Online (Sandbox Code Playgroud)
期望的输出
[Row(id='a', code='code1', name='name2')]
Run Code Online (Sandbox Code Playgroud)
Dan*_*ula 13
对于Spark 1.3 - 1.5,这可以解决问题:
from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()
+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
| a| code1| name2|
+---+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
编辑
显然,在1.6版本中,它们改变了first聚合函数的处理方式.现在,底层类First应该使用第二个参数构造,聚合函数ignoreNullsExpr尚未使用该first参数(这里可以看到).但是,在Spark 2.0中,它将能够调用agg(F.first(col, True))忽略空值(可在此处查看).
因此,对于Spark 1.6来说,这种方法必须是不同的,而且效率要低一些,这是非常有效的.一个想法如下:
from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()
+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
| a| code1| name2|
+---+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)
也许有更好的选择.如果我找到答案,我会编辑答案.
| 归档时间: |
|
| 查看次数: |
10024 次 |
| 最近记录: |