我正在使用 pyspark 创建一个数据框,如下所示:
+----+------+
| k| v|
+----+------+
|key1|value1|
|key1|value1|
|key1|value1|
|key2|value1|
|key2|value1|
|key2|value1|
+----+------+
Run Code Online (Sandbox Code Playgroud)
我想使用 'withColumn' 方法添加一个 'rowNum' 列,数据框的结果更改如下:
+----+------+------+
| k| v|rowNum|
+----+------+------+
|key1|value1| 1|
|key1|value1| 2|
|key1|value1| 3|
|key2|value1| 4|
|key2|value1| 5|
|key2|value1| 6|
+----+------+------+
Run Code Online (Sandbox Code Playgroud)
rowNum 的范围是从 1 到 n,n 等于原始数据的数量。我修改了我的代码,如下所示:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy("v").orderBy('k')
my_df= my_df.withColumn("rowNum", F.rowNumber().over(w))
Run Code Online (Sandbox Code Playgroud)
但是,我收到错误消息:
'module' object has no attribute 'rowNumber'
Run Code Online (Sandbox Code Playgroud)
我用row_number替换了rowNumber()方法,上面的代码就可以运行了。但是,当我运行代码时:
my_df.show()
Run Code Online (Sandbox Code Playgroud)
我再次收到错误消息:
Py4JJavaError: An error occurred while calling o898.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number()
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:342)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
at scala.Option.getOrElse(Option.scala:121)
Run Code Online (Sandbox Code Playgroud)
Spark 2.2中的解决方案:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("rowNum", row_number().over(w))
Run Code Online (Sandbox Code Playgroud)
如果您需要一个rowNum
从 1 到 n 的连续值,而不是 a,monotonically_increasing_id
您可以使用zipWithIndex()
重新创建示例数据,如下所示:
rdd = sc.parallelize([('key1','value1'),
('key1','value1'),
('key1','value1'),
('key1','value1'),
('key1','value1'),
('key1','value1')])
Run Code Online (Sandbox Code Playgroud)
然后,您可以使用zipWithIndex()
为每一行添加索引。用于map
重新格式化数据并向索引添加 1,使其从 1 开始。
rdd_indexed = rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1))
df = rdd_indexed.toDF(['id','score','rowNum'])
df.show()
+----+------+------+
| id| score|rowNum|
+----+------+------+
|key1|value1| 1|
|key1|value1| 2|
|key1|value1| 3|
|key1|value1| 4|
|key1|value1| 5|
|key1|value1| 6|
+----+------+------+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6762 次 |
最近记录: |