如何在pyspark中使用第一个和最后一个函数?

Iva*_*Lee 10 apache-spark pyspark

我使用第一个和最后一个函数来获取一列的第一个和最后一个值.但是,我发现这两个功能都不像我想象的那样有效.我提到了@ zero323答案,但我仍然对这两者感到困惑.代码如:

df = spark.sparkContext.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)
]).toDF(["k", "v"])
w = Window().partitionBy("k").orderBy('k','v')

df.select(F.col("k"), F.last("v",True).over(w).alias('v')).show()
Run Code Online (Sandbox Code Playgroud)

结果:

+---+----+
|  k|   v|
+---+----+
|  b|   1|
|  b|   3|
|  a|null|
|  a|  -1|
|  a|   1|
+---+----+
Run Code Online (Sandbox Code Playgroud)

我认为应该是这样的:

+---+----+
|  k|   v|
+---+----+
|  b|   3|
|  b|   3|
|  a|   1|
|  a|   1|
|  a|   1|
+---+----+
Run Code Online (Sandbox Code Playgroud)

因为,我在'k'和'v'上通过orderBy的操作显示了df:

df.orderBy('k','v').show()
    +---+----+
    |  k|   v|
    +---+----+
    |  a|null|
    |  a|  -1|
    |  a|   1|
    |  b|   1|
    |  b|   3|
    +---+----+
Run Code Online (Sandbox Code Playgroud)

另外,我想出了另一个测试这类问题的解决方案,我的代码如下:

df.orderBy('k','v').groupBy('k').agg(F.first('v')).show()
Run Code Online (Sandbox Code Playgroud)

我发现每次运行它之后它的结果可能会有所不同.有人遇到过和我一样的经历吗?我希望在我的项目中使用这两个函数,但我发现这些解决方案尚无定论.

Wil*_*ill 10

尝试使用反转排序顺序.desc(),然后first()将提供所需的输出.

w2 = Window().partitionBy("k").orderBy(df.v.desc())
df.select(F.col("k"), F.first("v",True).over(w2).alias('v')).show()
F.first("v",True).over(w2).alias('v').show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+---+
|  k|  v|
+---+---+
|  b|  3|
|  b|  3|
|  a|  1|
|  a|  1|
|  a|  1|
+---+---+
Run Code Online (Sandbox Code Playgroud)

你应该注意partitionBy和orderBy.由于您使用'k'进行分区,因此任何给定窗口中的k的所有值都是相同的.按'k'排序什么都不做.

最后一个函数实际上与第一个函数不同,就它返回的窗口中的哪个项而言.它返回它看到的最后一个非null值,因为它在有序行中前进.

为了比较它们的效果,这里是一个包含功能/排序组合的数据帧.注意在列'last_w2'中,null值是如何被-1替换的.

df = spark.sparkContext.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)]).toDF(["k", "v"])

#create two windows for comparison.
w = Window().partitionBy("k").orderBy('v')
w2 = Window().partitionBy("k").orderBy(df.v.desc())

df.select('k','v',
   F.first("v",True).over(w).alias('first_w1'),
   F.last("v",True).over(w).alias('last_w1'),
   F.first("v",True).over(w2).alias('first_w2'),
   F.last("v",True).over(w2).alias('last_w2')
).show()
Run Code Online (Sandbox Code Playgroud)

输出:

+---+----+--------+-------+--------+-------+
|  k|   v|first_w1|last_w1|first_w2|last_w2|
+---+----+--------+-------+--------+-------+
|  b|   1|       1|      1|       3|      1|
|  b|   3|       1|      3|       3|      3|
|  a|null|    null|   null|       1|     -1|
|  a|  -1|      -1|     -1|       1|     -1|
|  a|   1|      -1|      1|       1|      1|
+---+----+--------+-------+--------+-------+
Run Code Online (Sandbox Code Playgroud)


小智 6

看看问题 47130030
问题不在于 last() 函数,而在于框架,它只包含当前行之前的行。
使用

w = Window().partitionBy("k").orderBy('k','v').rowsBetween(W.unboundedPreceding,W.unboundedFollowing)
Run Code Online (Sandbox Code Playgroud)

将为 first() 和 last() 产生正确的结果。