Arj*_*jun 4 python user-defined-functions apache-spark apache-spark-sql pyspark
我试图在数据框中生成一个附加列,并根据全局值自动递增值。但是,所有行都是使用相同的值生成的,并且该值不会递增。
这是代码
def autoIncrement():
global rec
if (rec == 0) : rec = 1
else : rec = rec + 1
return int(rec)
rec=14
Run Code Online (Sandbox Code Playgroud)
UDF
autoIncrementUDF = udf(autoIncrement, IntegerType())
df1 = hiveContext.sql("select id,name,location,state,datetime,zipcode from demo.target")
df1.withColumn("id2", autoIncrementUDF()).show()
Run Code Online (Sandbox Code Playgroud)
这是结果 df
+---+------+--------+----------+-------------------+-------+---+
| id| name|location| state| datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00| NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00| NULL| 15|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00| NULL| 15|
| 30|Manish| Gurgoan| Gujarat|2018-03-27 11:00:00| NULL| 15|
+---+------+--------+----------+-------------------+-------+---+
Run Code Online (Sandbox Code Playgroud)
但我期待下面的结果
+---+------+--------+----------+-------------------+-------+---+
| id| name|location| state| datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00| NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00| NULL| 16|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00| NULL| 17|
| 30|Manish| Gurgoan| Gujarat|2018-03-27 11:00:00| NULL| 18|
+---+------+--------+----------+-------------------+-------+---+
Run Code Online (Sandbox Code Playgroud)
任何帮助表示赞赏。
全局变量绑定到 python 进程。AUDF可以在某个集群上的不同工作线程上并行执行,并且应该是确定性的。
您应该使用模块monotonically_increasing_id()中的函数pyspark.sql.functions。
检查文档以获取更多信息。
你应该小心,因为这个函数是动态的并且不具有粘性:
如何将行 id 的持久列添加到 Spark DataFrame?
| 归档时间: |
|
| 查看次数: |
14933 次 |
| 最近记录: |