Abh*_*til 4 python dataframe apache-spark apache-spark-sql pyspark
我有一个 PySpark 数据框,如下所示:
Id timestamp col1 col2
abc 789 0 1
def 456 1 0
abc 123 1 0
def 321 0 1
Run Code Online (Sandbox Code Playgroud)
我想按 ID 列进行分组或分区,然后应根据时间戳的顺序创建 col1 和 col2 的列表。
Id timestamp col1 col2
abc [123,789] [1,0] [0,1]
def [321,456] [0,1] [1,0]
Run Code Online (Sandbox Code Playgroud)
我的做法:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
window_spec = W.partitionBy("id").orderBy('timestamp')
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
df1 = df.withColumn("col1", F.collect_list("reco").over(window_spec))\
.withColumn("col2", F.collect_list("score").over(window_spec))\
df1.show()
Run Code Online (Sandbox Code Playgroud)
但这并没有返回 col1 和 col2 的列表。
我认为使用聚合不能可靠地保留顺序groupBy。所以窗口函数似乎是一条出路。
设置:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('abc', 789, 0, 1),
('def', 456, 1, 0),
('abc', 123, 1, 0),
('def', 321, 0, 1)],
['Id', 'timestamp', 'col1', 'col2'])
Run Code Online (Sandbox Code Playgroud)
脚本:
w1 = W.partitionBy('Id').orderBy('timestamp')
w2 = W.partitionBy('Id').orderBy(F.desc('timestamp'))
df = df.select(
'Id',
*[F.collect_list(c).over(w1).alias(c) for c in df.columns if c != 'Id']
)
df = (df
.withColumn('_rn', F.row_number().over(w2))
.filter('_rn=1')
.drop('_rn')
)
Run Code Online (Sandbox Code Playgroud)
结果:
df.show()
# +---+----------+------+------+
# | Id| timestamp| col1| col2|
# +---+----------+------+------+
# |abc|[123, 789]|[1, 0]|[0, 1]|
# |def|[321, 456]|[0, 1]|[1, 0]|
# +---+----------+------+------+
Run Code Online (Sandbox Code Playgroud)
您也非常接近您所需要的。我玩过,这似乎也有效:
window_spec = W.partitionBy("Id").orderBy('timestamp')
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
df1 = (df
.withColumn("timestamp", F.collect_list("timestamp").over(ranged_spec))
.withColumn("col1", F.collect_list("col1").over(ranged_spec))
.withColumn("col2", F.collect_list("col2").over(ranged_spec))
).drop_duplicates()
df1.show()
Run Code Online (Sandbox Code Playgroud)