向pyspark中的数据帧添加唯一的连续行号

vik*_*ana 1 csv dataframe rdd pyspark

我想在 pyspark 中将唯一的行号添加到我的数据框中,并且不想使用 monotonicallyIncreasingId 和 partitionBy 方法。我认为这个问题可能与之前提出的类似问题重复,仍在寻找一些建议,无论我的做法是否正确。以下是我的代码片段:我有一个包含以下输入记录集的 csv 文件:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000
Run Code Online (Sandbox Code Playgroud)

我已将此 csv 文件加载到数据框中。

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]
Run Code Online (Sandbox Code Playgroud)

当我将 int 值包含到我的列表中时,我丢失了数据帧架构。

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+
Run Code Online (Sandbox Code Playgroud)

我这样做对吗?或者有没有更好的方法在 pyspark 中添加或保留数据帧的模式?

使用 zipWithIndex 方法为大尺寸数据框添加唯一的连续行号是否可行?我们可以使用这个 row_id 重新分区数据帧以在分区之间均匀分布数据吗?

vik*_*ana 9

我找到了一个解决方案,而且非常简单。因为我的数据框中没有列在所有行中都具有相同的值,所以在将 row_number 与 partitionBy 子句一起使用时,它不会生成唯一的行号。

让我们向现有数据框中添加一个新列,其中包含一些默认值。

emp_df= emp_df.withColumn("new_column",lit("ABC"))
Run Code Online (Sandbox Code Playgroud)

并使用该列“new_column”创建一个带有 parition 的窗口函数

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")
Run Code Online (Sandbox Code Playgroud)

你会得到想要的结果:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+
Run Code Online (Sandbox Code Playgroud)

  • 更简单的方法: withColumn("index",F.row_number().over(Window.orderBy(monotically_increasing_id()))-1) (6认同)
  • 这些解决方案将数据集移动到单个分区。这大多是你真正想要避免的事情 (4认同)