自动 - 递增 pyspark 数据框列值

Arj*_*jun 4 python user-defined-functions apache-spark apache-spark-sql pyspark

我试图在数据框中生成一个附加列,并根据全局值自动递增值。但是,所有行都是使用相同的值生成的,并且该值不会递增。

这是代码

def autoIncrement():
    global rec
    if (rec == 0) : rec = 1 
    else : rec = rec + 1
    return int(rec)

rec=14
Run Code Online (Sandbox Code Playgroud)

UDF

autoIncrementUDF = udf(autoIncrement,  IntegerType())


df1 = hiveContext.sql("select id,name,location,state,datetime,zipcode from demo.target")

df1.withColumn("id2", autoIncrementUDF()).show()
Run Code Online (Sandbox Code Playgroud)

这是结果 df

+---+------+--------+----------+-------------------+-------+---+
| id|  name|location|     state|           datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00|   NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00|   NULL| 15|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00|   NULL| 15|
| 30|Manish| Gurgoan|   Gujarat|2018-03-27 11:00:00|   NULL| 15|
+---+------+--------+----------+-------------------+-------+---+
Run Code Online (Sandbox Code Playgroud)

但我期待下面的结果

+---+------+--------+----------+-------------------+-------+---+
| id|  name|location|     state|           datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00|   NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00|   NULL| 16|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00|   NULL| 17|
| 30|Manish| Gurgoan|   Gujarat|2018-03-27 11:00:00|   NULL| 18|
+---+------+--------+----------+-------------------+-------+---+
Run Code Online (Sandbox Code Playgroud)

任何帮助表示赞赏。

Sus*_*sio 5

全局变量绑定到 python 进程。AUDF可以在某个集群上的不同工作线程上并行执行,并且应该是确定性的。

您应该使用模块monotonically_increasing_id()中的函数pyspark.sql.functions

检查文档以获取更多信息

你应该小心,因为这个函数是动态的并且不具有粘性:

如何将行 id 的持久列添加到 Spark DataFrame?

  • 您的第一个链接已损坏。 (2认同)