str*_*der 2 python apache-spark pyspark
我正在尝试在 DataFrame 中实现自动增量列。我已经找到了解决方案,但我想知道是否有更好的方法来做到这一点。
我正在使用monotonically_increasing_id()来自 的函数pyspark.sql.functions。问题是从 0 开始,而我希望它从 1 开始。
所以,我做了以下工作并且工作正常:
(F.monotonically_increasing_id()+1).alias("songplay_id")
dfLog.join(dfSong, (dfSong.artist_name == dfLog.artist) & (dfSong.title == dfLog.song))\
.select((F.monotonically_increasing_id()+1).alias("songplay_id"), \
dfLog.ts.alias("start_time"), dfLog.userId.alias("user_id"), \
dfLog.level, \
dfSong.song_id, \
dfSong.artist_id, \
dfLog.sessionId.alias("session_id"), \
dfLog.location, \
dfLog.userAgent.alias("user_agent"))
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来实现我想做的事情?我认为,为此实现 udf 函数的工作量太大,还是只有我一个人?
谢谢。-
不保证序列monotonically_increasing_id是连续的,但保证它们是单调递增的。您的作业的每个任务都会分配一个起始整数,每行都会加 1,但是一个批次的最后一个 id 和另一个批次的第一个 id 之间会有间隙。要验证此行为,您可以通过重新分区示例数据帧来创建包含两个任务的作业:
import pandas as pd
import pyspark.sql.functions as psf
spark.createDataFrame(pd.DataFrame([[i] for i in range(10)], columns=['value'])) \
.repartition(2) \
.withColumn('id', psf.monotonically_increasing_id()) \
.show()
+-----+----------+
|value| id|
+-----+----------+
| 3| 0|
| 0| 1|
| 6| 2|
| 2| 3|
| 4| 4|
| 7|8589934592|
| 5|8589934593|
| 8|8589934594|
| 9|8589934595|
| 1|8589934596|
+-----+----------+
Run Code Online (Sandbox Code Playgroud)
为了确保索引产生连续值,您可以使用窗口函数。
from pyspark.sql import Window
w = Window.orderBy('id')
spark.createDataFrame(pd.DataFrame([[i] for i in range(10)], columns=['value'])) \
.withColumn('id', psf.monotonically_increasing_id()) \
.withColumn('id2', psf.row_number().over(w)) \
.show()
+-----+---+---+
|value| id|id2|
+-----+---+---+
| 0| 0| 1|
| 1| 1| 2|
| 2| 2| 3|
| 3| 3| 4|
| 4| 4| 5|
| 5| 5| 6|
| 6| 6| 7|
| 7| 7| 8|
| 8| 8| 9|
| 9| 9| 10|
+-----+---+---+
Run Code Online (Sandbox Code Playgroud)
笔记:
monotonically_increasing_id允许您在读取行时设置行的顺序,它从0第一个任务开始并增加,但不一定按顺序方式row_number按顺序索引有序窗口中的行并从1| 归档时间: |
|
| 查看次数: |
5537 次 |
| 最近记录: |