使用monotonically_increasing_id()将行号分配给pyspark数据帧

mun*_*uni 21 python indexing merge pyspark

我使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:

df1 = df1.withColumn("idx", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)

现在df1有26,572,528条记录.所以我期待idx值从0-26,572,527.

但是当我选择max(idx)时,它的值非常大:335,008,054,165.

这个功能发生了什么?使用此函数与具有相似记录数的其他数据集合并是否可靠?

我有大约300个数据帧,我想将它们组合成一个数据帧.因此,一个数据帧包含ID,而其他数据帧包含与行对应的不同记录

mka*_*ran 24

文档中

生成单调递增的64位整数的列.

生成的ID保证单调增加且唯一,但不是连续的.当前实现将分区ID置于高31位,并将每个分区内的记录号置于低33位.假设数据框的分区少于10亿,每个分区的记录少于80亿.

因此,它不像RDB中的自动增量ID,并且它可靠用于合并.

如果您需要像RDB中那样的自动增量行为,并且您的数据是可排序的,那么您可以使用 row_number

df.createOrReplaceTempView('df')
spark.sql('select row_number() over (order by "some_column") as num, * from df')
+---+-----------+
|num|some_column|
+---+-----------+
|  1|   ....... |
|  2|   ....... |
|  3| ..........|
+---+-----------+
Run Code Online (Sandbox Code Playgroud)

如果您的数据不可排序,并且您不介意使用rdds创建索引然后回退到数据帧,则可以使用 rdd.zipWithIndex()

这里可以找到一个例子

简而言之:

# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex()
df = df.rdd.zipWithIndex()
# return back to dataframe
df = df.toDF()

df.show()

# your data           | indexes
+---------------------+---+
|         _1          | _2| 
+-----------=---------+---+
|[data col1,data col2]|  0|
|[data col1,data col2]|  1|
|[data col1,data col2]|  2|
+---------------------+---+
Run Code Online (Sandbox Code Playgroud)

在此之后,您可能需要进行一些更改,以使您的数据框符合您的需要.注意:不是一个非常高效的解决方案.

希望这可以帮助.祝好运!

编辑: 来考虑一下,你可以结合monotonically_increasing_id使用row_number:

# create a monotonically increasing id 
df = df.withColumn("idx", monotonically_increasing_id())

# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df')
new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')
Run Code Online (Sandbox Code Playgroud)

虽然不确定性能.


Ram*_*jan 19

使用api函数,您可以简单地执行以下操作

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
df1 = df1.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
df1.withColumn("idx", F.row_number().over(windowSpec)).show()
Run Code Online (Sandbox Code Playgroud)

我希望答案是有帮助的

  • 请仔细检查答案,因为它会将所有行移动到单个分区中(这可能会导致OOM). (8认同)
  • @JeffEvans因为窗口规范不使用任何“partitionBy”并且仅对“row_number()”使用一个分区。 (3认同)