如何对pyspark数据帧中的单列进行整形操作?

Sag*_*rys 2 numpy pandas apache-spark apache-spark-sql pyspark

我有一个很长的 pyspark 数据框,如下所示:

+------+
|number|
+------+
|12.4  |
|13.4  |
|42.3  |
|33.4  |
|42.3  |
|32.4  |
|44.2  |
|12.3  |
|45.4  |
+------+
Run Code Online (Sandbox Code Playgroud)

理想情况下,我希望将其重塑为nxn矩阵,其中nis sqrt(length of pyspark dataframe)

虽然有一个解决方案是将其转换为一个 numpy 数组,然后将其重塑为nxn矩阵,但我希望在 pyspark 中完成。因为我的数据超长(大概一亿行)。

所以我正在寻找的预期输出是这样的:

+------+------+------+
|12.4  | 13.4 | 42.3 |
|33.4  | 42.3 | 32.4 |
|44.2  | 12.3 | 45.4 |
+------+------+------+

Run Code Online (Sandbox Code Playgroud)

虽然我能够通过将其转换为熊猫然后转换为 numpy 然后进行重塑操作来正确地做到这一点。但我想在 Pyspark 本身中进行这种转换。因为下面的代码仅适用于几千行。

covarianceMatrix_pd = covarianceMatrix_df.toPandas()
nrows = np.sqrt(len(covarianceMatrix_pd))
covarianceMatrix_pd = covarianceMatrix_pd.to_numpy().reshape((int(nrows),int(nrows)))
covarianceMatrix_pd
Run Code Online (Sandbox Code Playgroud)

ank*_*_91 5

一种方法是row_number在我们对数据帧进行计数后使用with pivot :

from pyspark.sql import functions as F, Window
from math import sqrt
Run Code Online (Sandbox Code Playgroud)
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))

out = (df.withColumn("Rnum",((rnum-1)/c).cast("Integer"))
 .withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("number")))
Run Code Online (Sandbox Code Playgroud)
out.show()

+----+----+----+----+
|Rnum|   1|   2|   3|
+----+----+----+----+
|   0|12.4|13.4|42.3|
|   1|33.4|42.3|32.4|
|   2|44.2|12.3|45.4|
+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)

  • @VishalAnand 顺序可能会改变,但是 Rnum 分配正确吗?如果是,那么这应该可行。我在社区版本中测试了你的示例并且它有效 (2认同)